feat: Add recursive CTE method for Departemen model and update views for enhanced data retrieval

1 parent 2d8efb28
from sqlalchemy import (Column, Integer, ForeignKey, String, SmallInteger) from sqlalchemy import (Column, Integer, ForeignKey, String, SmallInteger, text)
from sqlalchemy.orm import (relationship, backref, declared_attr) from sqlalchemy.orm import (relationship, backref, declared_attr)
from ..models import DBSession, Base from ..models import DBSession, Base
from ..models import (NamaModel, TABLE_ARGS) from ..models import (NamaModel, TABLE_ARGS)
...@@ -46,5 +46,64 @@ class _Departemen(NamaModel): ...@@ -46,5 +46,64 @@ class _Departemen(NamaModel):
def get_list(cls): def get_list(cls):
return DBSession.query(cls.id, cls.nama).order_by(cls.nama).all() return DBSession.query(cls.id, cls.nama).order_by(cls.nama).all()
@classmethod
def cte_get(cls, search=None, **kwargs):
# tahun = kwargs.get('tahun', self.req.params.get(
# 'tahun', datetime.datetime.now().year-1))
parent_id = kwargs.get('id', None)
str_where = parent_id and " parent_id={} ".format(
parent_id) or " parent_id IS NULL"
sql = """
WITH RECURSIVE dep_tree AS (
SELECT
id,
kode,
nama,
parent_id,
status,
0 AS level,
ARRAY[id] AS path
FROM
public.departemen
WHERE
{str_where}
UNION ALL
SELECT
c.id,
c.kode,
c.nama,
c.parent_id,
c.status,
ct.level + 1,
ct.path || c.id
FROM
public.departemen c
JOIN
dep_tree ct ON c.parent_id = ct.id
)
SELECT
REPEAT(' ', level) || nama AS hierarchy, -- Indent names based on level
id,
kode,
nama,
parent_id,
status,
level,
path
FROM
dep_tree
""".format(str_where=str_where)
if search:
sql = f"{sql} WHERE nama ILIKE '%{search}%' "
sql=sql+"""ORDER BY path;"""
return cls.db_session.execute(text(sql)).fetchall()
class Departemen(_Departemen, Base): class Departemen(_Departemen, Base):
db_session = DBSession db_session = DBSession
...@@ -106,7 +106,6 @@ class Views(BaseView): ...@@ -106,7 +106,6 @@ class Views(BaseView):
self.add_schema = AddSchema self.add_schema = AddSchema
self.edit_schema = EditSchema self.edit_schema = EditSchema
self.table = Departemen self.table = Departemen
# self.list_url = 'departemen'
self.list_route = 'base-departemen' self.list_route = 'base-departemen'
self.form_scripts = "" self.form_scripts = ""
self.list_buttons = self.list_buttons + (btn_upload,) self.list_buttons = self.list_buttons + (btn_upload,)
...@@ -170,26 +169,28 @@ class Views(BaseView): ...@@ -170,26 +169,28 @@ class Views(BaseView):
values["parent_id"] = None values["parent_id"] = None
row = super().save_request(values, row) row = super().save_request(values, row)
return row return row
def view_act(self):
request = self.req
params = request.params
url_dict = request.matchdict
if url_dict['act'] == 'grid':
query = Departemen.cte_get()
data = [{"id": d.id, "kode": d.kode, "nama": d.nama, "status": d.status,
"level_id":d.level, "parent_id": d.parent_id} for d in query]
return {
"data": data}
else:
return self.next_act()
# @view_config(route_name='departemen-view',
# renderer='templates/form.pt', permission='departemen')
# def view_view(self):
# return super(ViewDepartemen, self).view_view()
# @view_config(route_name='departemen',
# renderer='templates/table.pt',
# permission='departemen')
# def view_list(self):
# return super().view_list()
# @view_config(route_name='departemen-act', renderer='json',
# permission='view')
def next_act(self): def next_act(self):
request = self.req request = self.req
params = request.params params = request.params
url_dict = request.matchdict url_dict = request.matchdict
if url_dict['act'] == 'hon': if url_dict['act'] == 'hon':
term = params.get('term', '') term = params.get('term', '')
# q = Departemen.cte_get(search=term)
q = DBSession.query(Departemen). \ q = DBSession.query(Departemen). \
filter(Departemen.status == 1, filter(Departemen.status == 1,
Departemen.nama.ilike(f'%{term}%')) \ Departemen.nama.ilike(f'%{term}%')) \
...@@ -197,166 +198,12 @@ class Views(BaseView): ...@@ -197,166 +198,12 @@ class Views(BaseView):
rows = q.all() rows = q.all()
r = [] r = []
for k in rows: for k in rows:
d = dict(id=k.id, value=k.nama, kode=k.kode, nama=k.nama, d = dict(id=k.id, value=f"{k.kode}:{k.nama}", kode=k.kode, nama=k.nama,
level_id=k.level_id) level_id=k.level_id)
r.append(d) r.append(d)
return r return r
# dep_alias = aliased(Departemen)
# if url_dict['act'] == 'grid':
# columns = [ColumnDT(Departemen.id, mData='id'),
# ColumnDT(Departemen.kode, mData='kode'),
# ColumnDT(Departemen.nama, mData='nama'),
# ColumnDT(dep_alias.nama, mData='parent'),
# ColumnDT(Departemen.status, mData='status'),
# ColumnDT(Departemen.level_id, mData='level_id'),
# ColumnDT(ResCompany.nama, mData='company_nm'), ]
# query = DBSession.query().select_from(Departemen).outerjoin(
# dep_alias, Departemen.parent_id == dep_alias.id).outerjoin(
# ResCompany, self.table.company_id == ResCompany.id
# )
# query = self.filter_company(query)
# row_table = DataTables(request.GET, query, columns)
# return row_table.output_result()
# el
# elif url_dict['act'] == 'honk':
# term = 'term' in params and params['term'] or ''
# q = DBSession.query(Departemen) \
# .filter(Departemen.status == 1,
# func.concat(Departemen.nama, ';',
# Departemen.kode)
# .ilike('%%%s%%' % term)) \
# .order_by(Departemen.nama)
# q = self.filter_company(q)
# rows = q.all()
# r = []
# for k in rows:
# d = dict(id=k.id, value=k.nama + ';' + k.kode, kode=k.kode,
# nama=k.nama, level_id=k.level_id)
# r.append(d)
# return r
# elif url_dict['act'] == 'hon_level':
# # todo Check ulang untuk hon
# term = 'term' in params and params['term'] or ''
# settings = get_settings()
# level_id = get_params('departemen_chg_id', 0)
# q = DBSession.query(Departemen).filter(Departemen.status == 1,
# Departemen.nama.ilike(
# '%%%s%%' %
# term)).order_by(
# Departemen.nama)
# if self.req.user.company_id:
# q = q.filter(Departemen.company_id == self.req.user.company_id)
# if int(level_id) > 0:
# q = q.filter(Departemen.level_id == int(level_id))
# if request.user.id > 1 and not request.has_permission(
# "departemen-all"):
# partner = Partner.query_id(request.user.id).first()
# if partner:
# PartnerDepartemen.query_jabatan(partner.id, datetime.now())
# user_dep = PartnerDepartemen.query_user_id().first()
# if not user_dep:
# return []
# kode = user_dep.departemen.kode
# if user_dep.sub_departemen:
# q = q.filter(Departemen.kode.ilike('{}%'.format(kode)))
# else:
# q = q.filter(Departemen.kode == kode)
# rows = q.all()
# r = []
# for k in rows:
# d = dict(id=k.id, value=k.nama, kode=k.kode, nama=k.nama,
# level_id=k.level_id)
# r.append(d)
# return r
# elif url_dict['act'] == 'hon_all':
# term = 'term' in params and params['term'] or ''
# settings = get_settings()
# level_id = 'departemen_chg_id' in settings and settings[
# 'departemen_chg_id'] or 0
# q = DBSession.query(Departemen).filter(Departemen.status == 1,
# Departemen.nama.ilike(
# '%%%s%%' %
# term)).order_by(
# Departemen.nama)
# if self.req.user.company_id:
# q = q.filter(Departemen.company_id == self.req.user.company_id)
# if int(level_id) > 0:
# q = q.filter(Departemen.level_id == int(level_id))
# rows = q.all()
# r = []
# for k in rows:
# d = dict(id=k.id, value=k.nama, kode=k.kode, nama=k.nama,
# level_id=k.level_id)
# r.append(d)
# return r
# def get_bindings(self, row=None):
# return {"company_list": ResCompany.get_list()}
# @view_config(route_name='departemen-add', renderer='templates/form.pt',
# permission='departemen')
# def view_add(self):
# return super(ViewDepartemen, self).view_add()
# @view_config(route_name='departemen-edit',
# renderer='templates/form.pt', permission='departemen')
# def view_edit(self):
# return super(ViewDepartemen, self).view_edit()
# @view_config(route_name='departemen-delete',
# renderer='templates/form.pt', permission='departemen')
# def view_delete(self):
# return super(ViewDepartemen, self).view_delete()
# @view_config(route_name='departemen-upload',
# renderer='templates/form.pt',
# permission='departemen')
# def view_upload(self):
# return super().view_upload(exts=('.csv', '.tsv'), delimiter="\t")
# request = self.req
# form = self.get_form(UploadSchema)
# if request.POST:
# if 'save' in request.POST:
# input_file = request.POST['upload'].file
# filename = request.POST['upload'].filename
# ext = get_ext(filename)
# if ext.lower() != '.csv':
# request.session.flash('File harus format csv', 'error')
# return dict(form=form.render())
# if not input_file:
# return dict(form=form.render())
# input_file.seek(0)
# temp_file_path = '/tmp/' + get_random_string(10) + '.csv'
#
# with open(temp_file_path, 'wb') as output_file:
# shutil.copyfileobj(input_file, output_file)
#
# with open(temp_file_path) as f:
# c = csv.DictReader(f)
# for csv_row in c:
# kode = csv_row['kode']
# if kode:
# xcode = kode.split(".")
# for r in range(len(xcode)):
# xc = xcode[r] and int(xcode[r])
# if not xc and type(xc) == int:
# code = ""
# for t in range(r):
# code += xcode[t] + '.'
#
# if code:
# code = code[:-1]
# self.save_upload(code, csv_row)
#
# self.save_upload(kode, csv_row)
#
# DBSession.flush()
# os.remove(temp_file_path)
#
# return self.route_list()
# return dict(form=form.render())
def get_values(self, row, values=None): def get_values(self, row, values=None):
if not values: if not values:
......
...@@ -14,8 +14,8 @@ class ListSchema(colander.Schema): ...@@ -14,8 +14,8 @@ class ListSchema(colander.Schema):
colander.String(),) colander.String(),)
ruang = colander.SchemaNode( ruang = colander.SchemaNode(
colander.String(),) colander.String(),)
tunjangan = colander.SchemaNode( # tunjangan = colander.SchemaNode(
colander.Integer(),) # colander.Integer(),)
status = colander.SchemaNode( status = colander.SchemaNode(
colander.Integer(), widget=widget.CheckboxWidget(),) colander.Integer(), widget=widget.CheckboxWidget(),)
......
...@@ -11,8 +11,7 @@ from pyramid.i18n import TranslationStringFactory ...@@ -11,8 +11,7 @@ from pyramid.i18n import TranslationStringFactory
from ..views import BaseView from ..views import BaseView
_ = TranslationStringFactory("opensipkd") _ = TranslationStringFactory("opensipkd")
SESS_ADD_FAILED = 'Tambah jabatan gagal'
SESS_EDIT_FAILED = 'Edit jabatan gagal'
JENIS = ((1, _('structural', default='Structural')), JENIS = ((1, _('structural', default='Structural')),
(2, _('functional', default='Functional')), (2, _('functional', default='Functional')),
(3, _('finance', default='Finance')), (3, _('finance', default='Finance')),
...@@ -89,8 +88,7 @@ class Views(BaseView): ...@@ -89,8 +88,7 @@ class Views(BaseView):
def __init__(self, request): def __init__(self, request):
super().__init__(request) super().__init__(request)
self.form_params = dict(scripts="") self.form_params = dict(scripts="")
self.list_url = 'jabatan' self.list_route = 'base-jabatan'
self.list_route = 'jabatan'
self.add_schema = AddSchema self.add_schema = AddSchema
self.edit_schema = EditSchema self.edit_schema = EditSchema
self.table = Jabatan self.table = Jabatan
......
...@@ -14,15 +14,18 @@ class AddSchema(colander.Schema): ...@@ -14,15 +14,18 @@ class AddSchema(colander.Schema):
nama_widget = widget.AutocompleteInputWidget( nama_widget = widget.AutocompleteInputWidget(
size=60, size=60,
min_length=2, min_length=2,
style="z-index: 100000 !important;") # style="z-index: 100001 !important;"
)
departemen_widget = widget.AutocompleteInputWidget( departemen_widget = widget.AutocompleteInputWidget(
size=60, size=60,
min_length=2, min_length=2,
style="z-index: 100001 !important;") # style="z-index: 100000 !important;"
)
jabatan_widget = widget.AutocompleteInputWidget( jabatan_widget = widget.AutocompleteInputWidget(
size=60, size=60,
min_length=2, min_length=2,
style="z-index: 99999 !important;") # style="z-index: 99999 !important;"
)
partner_id = colander.SchemaNode( partner_id = colander.SchemaNode(
colander.Integer(), colander.Integer(),
oid="partner_id", oid="partner_id",
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!