feat: Add TestModel and related API views, forms, and templates

1 parent be0b4c22
......@@ -122,3 +122,4 @@ alembic_local.ini
.vs*
activate
logo.png
.vscode/
\ No newline at end of file
"""create test
Revision ID: 8de8d8168688
Revises: 10a68b1510a6
Create Date: 2026-01-29 15:58:55.599525
"""
# revision identifiers, used by Alembic.
revision = '8de8d8168688'
down_revision = '10a68b1510a6'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('test_model',
sa.Column('description', sa.String(length=256), nullable=True),
sa.Column('nama', sa.String(length=128), nullable=False),
sa.Column('kode', sa.String(length=32), nullable=False),
sa.Column('status', sa.SmallInteger(), nullable=False),
sa.Column('created', sa.DateTime(), nullable=True),
sa.Column('updated', sa.DateTime(), nullable=True),
sa.Column('create_uid', sa.Integer(), nullable=True),
sa.Column('update_uid', sa.Integer(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_test_model')),
schema='public'
)
op.drop_constraint(op.f('fk_departemen_parent_id_departemen'), 'departemen', type_='foreignkey')
op.create_foreign_key(op.f('fk_departemen_parent_id_departemen'), 'departemen', 'departemen', ['parent_id'], ['id'], source_schema='public', referent_schema='public')
op.drop_constraint(op.f('fk_jabatan_eselon_id_eselon'), 'jabatan', type_='foreignkey')
op.create_foreign_key(op.f('fk_jabatan_eselon_id_eselon'), 'jabatan', 'eselon', ['eselon_id'], ['id'], source_schema='public', referent_schema='public')
op.drop_constraint(op.f('fk_partner_provinsi_id_res_provinsi'), 'partner', type_='foreignkey')
op.drop_constraint(op.f('fk_partner_desa_id_res_desa'), 'partner', type_='foreignkey')
op.drop_constraint(op.f('fk_partner_dati2_id_res_dati2'), 'partner', type_='foreignkey')
op.drop_constraint(op.f('fk_partner_kecamatan_id_res_kecamatan'), 'partner', type_='foreignkey')
op.create_foreign_key(op.f('fk_partner_desa_id_res_desa'), 'partner', 'res_desa', ['desa_id'], ['id'], referent_schema='public')
op.create_foreign_key(op.f('fk_partner_dati2_id_res_dati2'), 'partner', 'res_dati2', ['dati2_id'], ['id'], referent_schema='public')
op.create_foreign_key(op.f('fk_partner_kecamatan_id_res_kecamatan'), 'partner', 'res_kecamatan', ['kecamatan_id'], ['id'], referent_schema='public')
op.create_foreign_key(op.f('fk_partner_provinsi_id_res_provinsi'), 'partner', 'res_provinsi', ['provinsi_id'], ['id'], referent_schema='public')
op.drop_constraint(op.f('fk_partner_departemen_partner_id_partner'), 'partner_departemen', type_='foreignkey')
op.drop_constraint(op.f('fk_partner_departemen_jabatan_id_jabatan'), 'partner_departemen', type_='foreignkey')
op.drop_constraint(op.f('fk_partner_departemen_departemen_id_departemen'), 'partner_departemen', type_='foreignkey')
op.create_foreign_key(op.f('fk_partner_departemen_partner_id_partner'), 'partner_departemen', 'partner', ['partner_id'], ['id'], source_schema='public')
op.create_foreign_key(op.f('fk_partner_departemen_jabatan_id_jabatan'), 'partner_departemen', 'jabatan', ['jabatan_id'], ['id'], source_schema='public', referent_schema='public')
op.create_foreign_key(op.f('fk_partner_departemen_departemen_id_departemen'), 'partner_departemen', 'departemen', ['departemen_id'], ['id'], source_schema='public', referent_schema='public')
op.drop_constraint(op.f('fk_res_dati2_provinsi_id_res_provinsi'), 'res_dati2', type_='foreignkey')
op.create_foreign_key(op.f('fk_res_dati2_provinsi_id_res_provinsi'), 'res_dati2', 'res_provinsi', ['provinsi_id'], ['id'], source_schema='public', referent_schema='public')
op.drop_constraint(op.f('fk_res_desa_kecamatan_id_res_kecamatan'), 'res_desa', type_='foreignkey')
op.create_foreign_key(op.f('fk_res_desa_kecamatan_id_res_kecamatan'), 'res_desa', 'res_kecamatan', ['kecamatan_id'], ['id'], source_schema='public', referent_schema='public')
op.drop_constraint(op.f('fk_res_kecamatan_dati2_id_res_dati2'), 'res_kecamatan', type_='foreignkey')
op.create_foreign_key(op.f('fk_res_kecamatan_dati2_id_res_dati2'), 'res_kecamatan', 'res_dati2', ['dati2_id'], ['id'], source_schema='public', referent_schema='public')
op.drop_constraint(op.f('fk_user_area_desa_id_res_desa'), 'user_area', type_='foreignkey')
op.create_foreign_key(op.f('fk_user_area_desa_id_res_desa'), 'user_area', 'res_desa', ['desa_id'], ['id'], referent_schema='public')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(op.f('fk_user_area_desa_id_res_desa'), 'user_area', type_='foreignkey')
op.create_foreign_key(op.f('fk_user_area_desa_id_res_desa'), 'user_area', 'res_desa', ['desa_id'], ['id'])
op.drop_constraint(op.f('fk_res_kecamatan_dati2_id_res_dati2'), 'res_kecamatan', schema='public', type_='foreignkey')
op.create_foreign_key(op.f('fk_res_kecamatan_dati2_id_res_dati2'), 'res_kecamatan', 'res_dati2', ['dati2_id'], ['id'])
op.drop_constraint(op.f('fk_res_desa_kecamatan_id_res_kecamatan'), 'res_desa', schema='public', type_='foreignkey')
op.create_foreign_key(op.f('fk_res_desa_kecamatan_id_res_kecamatan'), 'res_desa', 'res_kecamatan', ['kecamatan_id'], ['id'])
op.drop_constraint(op.f('fk_res_dati2_provinsi_id_res_provinsi'), 'res_dati2', schema='public', type_='foreignkey')
op.create_foreign_key(op.f('fk_res_dati2_provinsi_id_res_provinsi'), 'res_dati2', 'res_provinsi', ['provinsi_id'], ['id'])
op.drop_constraint(op.f('fk_partner_departemen_departemen_id_departemen'), 'partner_departemen', schema='public', type_='foreignkey')
op.drop_constraint(op.f('fk_partner_departemen_jabatan_id_jabatan'), 'partner_departemen', schema='public', type_='foreignkey')
op.drop_constraint(op.f('fk_partner_departemen_partner_id_partner'), 'partner_departemen', schema='public', type_='foreignkey')
op.create_foreign_key(op.f('fk_partner_departemen_departemen_id_departemen'), 'partner_departemen', 'departemen', ['departemen_id'], ['id'])
op.create_foreign_key(op.f('fk_partner_departemen_jabatan_id_jabatan'), 'partner_departemen', 'jabatan', ['jabatan_id'], ['id'])
op.create_foreign_key(op.f('fk_partner_departemen_partner_id_partner'), 'partner_departemen', 'partner', ['partner_id'], ['id'])
op.drop_constraint(op.f('fk_partner_provinsi_id_res_provinsi'), 'partner', type_='foreignkey')
op.drop_constraint(op.f('fk_partner_kecamatan_id_res_kecamatan'), 'partner', type_='foreignkey')
op.drop_constraint(op.f('fk_partner_dati2_id_res_dati2'), 'partner', type_='foreignkey')
op.drop_constraint(op.f('fk_partner_desa_id_res_desa'), 'partner', type_='foreignkey')
op.create_foreign_key(op.f('fk_partner_kecamatan_id_res_kecamatan'), 'partner', 'res_kecamatan', ['kecamatan_id'], ['id'])
op.create_foreign_key(op.f('fk_partner_dati2_id_res_dati2'), 'partner', 'res_dati2', ['dati2_id'], ['id'])
op.create_foreign_key(op.f('fk_partner_desa_id_res_desa'), 'partner', 'res_desa', ['desa_id'], ['id'])
op.create_foreign_key(op.f('fk_partner_provinsi_id_res_provinsi'), 'partner', 'res_provinsi', ['provinsi_id'], ['id'])
op.drop_constraint(op.f('fk_jabatan_eselon_id_eselon'), 'jabatan', schema='public', type_='foreignkey')
op.create_foreign_key(op.f('fk_jabatan_eselon_id_eselon'), 'jabatan', 'eselon', ['eselon_id'], ['id'])
op.drop_constraint(op.f('fk_departemen_parent_id_departemen'), 'departemen', schema='public', type_='foreignkey')
op.create_foreign_key(op.f('fk_departemen_parent_id_departemen'), 'departemen', 'departemen', ['parent_id'], ['id'])
op.drop_table('test_model', schema='public')
# ### end Alembic commands ###
import logging
from datetime import datetime
from opensipkd.base.models.meta import Base
import ziggurat_foundations.models
from opensipkd.tools import as_timezone
from opensipkd.tools.upload import append_csv
......@@ -99,7 +100,9 @@ class DefaultModel(CommonModel):
def save(cls, values, row=None, **kwargs):
if not row:
row = cls()
row.from_dict(values)
for k, v in values.items():
if hasattr(row, k):
setattr(row, k, v)
return row
@classmethod
......@@ -155,11 +158,26 @@ class DefaultModel(CommonModel):
class StandarModel(DefaultModel):
status = Column(SmallInteger, nullable=False, default=0)
created = Column(DateTime, nullable=True, default=datetime.utcnow)
created = Column(DateTime, nullable=True, default=datetime.now().astimezone())
updated = Column(DateTime, nullable=True)
create_uid = Column(Integer, nullable=True, default=1)
update_uid = Column(Integer, nullable=True)
@classmethod
def save(cls, values, row=None, **kwargs):
user = kwargs.get("user", None)
if not row:
values['created'] = datetime.now().astimezone()
if user:
values['create_uid'] = user.id
status = values.get('status', None)
if status is None:
values['status'] = 0
else:
values['updated'] = datetime.now().astimezone()
if user:
values['update_uid'] = user.id
return super().save(values, row, **kwargs)
# New Method
@classmethod
def query_status(cls, status=0, db_session=None):
......@@ -283,3 +301,8 @@ class NamaModel(KodeModel):
if not db_session:
db_session = cls.db_session
return cls.query_list(db_session=db_session).all()
class TestModel(NamaModel, Base):
__tablename__ = 'test_model'
__table_args__ = TABLE_ARGS
description = Column(String(256), nullable=True)
\ No newline at end of file
from calendar import c
from datetime import datetime
from click import group
......@@ -178,6 +179,24 @@ class _User(UserMixin, BaseModel):
perm_names.append(gp.perm_name)
return perm_names
@classmethod
def get_by_objek(cls, user: object):
"""Berguna jika user berbeda database dengan database session.
Misal user dari database opensipkd.base.models.users.User"""
if not user:
raise Exception("Parameter user is None")
resp = User.get_by_email(user.email)
if not resp:
resp = User.get_by_name(user.user_name)
return resp
@classmethod
def get_by_request(cls, request_user: object):
"""Berguna jika user berbeda database dengan database session.
Misal user dari database opensipkd.base.models.users.User"""
return cls.get_by_objek(request_user)
# @classmethod
# def get_departemen_id(cls, user_id):
# partner = Partner.query_user_id(user_id).first()
......
......@@ -107,3 +107,10 @@ base-eselon-add,/eselon/add,base,eselon,,,,admin,base-eselon,Eselon Add,1,0,,,fo
base-eselon-edit,/eselon/{id}/edit,base,eselon,,,,admin,base-eselon,Eselon Edit,1,0,,,form6.pt,
base-eselon-view,/eselon/{id}/view,base,eselon,,,,admin,base-eselon,Eselon View,1,0,,,form6.pt,
base-eselon-delete,/eselon/{id}/delete,base,eselon,,,,admin,base-eselon,Eselon Delete,1,0,,,form6.pt,
base-xhr-test,/xhr/test,,xhr_test,,view_list,,admin,base-admin,Test,1,0,,1,test_list.pt
base-xhr-test-act,/xhr/test/{act}/act,base,xhr_test,,,,admin,base-test,Test Action,1,0,,,json,
base-xhr-test-add,/xhr/test/add,base,xhr_test,,,,admin,base-test,Test Add,1,0,,,test_form.pt,
base-xhr-test-edit,/xhr/test/{id}/edit,base,xhr_test,,,,admin,base-test,Test Edit,1,0,,,test_form.pt,
base-xhr-test-view,/xhr/test/{id}/view,base,xhr_test,,,,admin,base-test,Test View,1,0,,,test_form.pt,
base-xhr-test-delete,/xhr/test/{id}/delete,base,xhr_test,,,,admin,base-test,Test Delete,1,0,,,test_form.pt,
base-xhr-test-api,/xhr/test/api,,xhr_test,ViewsApi,,,admin,,API Test,1,2,,,
\ No newline at end of file
from datetime import datetime, date
import logging
import colander
from decimal import Decimal
from deform import Form, ValidationFailure
from opensipkd.models import User
from math import e, exp, log
from pyramid.response import Response
from opensipkd.base.models import DBSession
from deform.widget import SelectWidget
from pyramid.response import Response
from pyramid.httpexceptions import *
import colander
from deform import Form, ValidationFailure
from deform.widget import SelectWidget
from opensipkd.base.models import DBSession
from opensipkd.tools.buttons import btn_save, btn_cancel
from opensipkd.tools import get_settings
from opensipkd.models import User
from opensipkd.base.views.common import DataTables, ColumnDT
from . import api_messages
from ..tools import obj2json
......@@ -25,6 +26,7 @@ class ApiViews(APIView):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.req = kwargs.get("request", None)
self.user = User
self.db_session = DBSession
self.buttons = (btn_save, btn_cancel)
self.table = None
......@@ -162,9 +164,8 @@ class ApiViews(APIView):
return data
except ValidationFailure as e:
_log.error("Error validasi %s", str(e.error.asdict()))
raise HTTPBadRequest(explanation=str(e.error.asdict())) from e
raise HTTPNotAcceptable(json_body=str(e.error.asdict())) from e
return dict(data)
def get_form(self, class_form, row=None, buttons=(btn_save, btn_cancel),
**kwargs):
......@@ -240,38 +241,62 @@ class ApiViews(APIView):
d = self.get_custom_render(d)
return Response(json=json.loads(json.dumps(d, default=self.json_adapter)))
def post_data(self, *args, **kwargs):
# digunakan untuk mengambil data post dari request
try:
data = self.req.json_body.items()
except Exception:
data = self.req.POST.items()
return data
def post(self, request, *args, **kwargs):
self.req = request
return self._update()
def _update(self, id_=None):
form = self.get_form(self.add_schema, validator=self.form_validator)
# TODO: harus mengakomodir data dari json juga
controls = self.req.POST.items()
try:
controls = self.req.json_body.items()
except Exception:
controls = self.req.POST.items()
data = self.form_validate(form, controls)
except ValidationFailure as e:
return Response(json={"errors": e.error.asdict()}, status=400)
if data.get("id", None):
row = self.table.query.get(data.get("id"))
# Response(json={"errors": e.error.asdict()}, status=400)
if id_:
row = self.table.query().get(id_)
else:
row = None
user = User.get_by_request(self.request.user)
user = self.user.get_by_request(self.req.user)
row = self.table.save(data, row, user=user)
self.db_session.add(row)
self.db_session.flush()
d = row.__dict__
d.pop("_sa_instance_state", None)
d.pop("db_session", None)
return Response(json=self.success(d))
def delete(self):
query = self.db_session.query(self.table)
# query = self.filter_ids(query)
row = query.first()
if not row:
return HTTPNotFound()
def put(self, request, *args, **kwargs):
data = request.json_body
id_ = data.get("id") or self.req.matchdict.get("id")
if not id_:
return HTTPBadRequest(explanation="ID is required")
return self._update(id_)
return Response(json=self.success())
def delete(self, request, *args, **kwargs):
self.req = request
data = self.req.json_body
if "id" not in data:
return HTTPBadRequest(explanation="ID is required")
query = self.table.query().filter_by(id=data.get("id"))
if not query.first():
return HTTPNotFound(explanation="Data not found")
query.delete()
self.db_session.flush()
self.req.session.flash("Data deleted")
return Response(json=self.success({}, msg={"message": "Data deleted"}))
def put(self, data):
self.req = data
return self.req
def patch(self, data):
self.req = data
......
<html metal:use-macro="load: ./base3.1.pt" tal:define="
scripts scripts|scripts;
readonly readonly|readonly;">
<div metal:fill-slot="content">
<div class="panel panel-default">
<div class="panel-heading">
<h3 class="panel-title"><i class="fa fa-fw fa-plus"></i>&nbsp;${request.title}</h3>
</div>
<div class="panel-body">
<div tal:content="structure form"></div>
</div>
</div>
</div>
<div metal:fill-slot="scripts">
<script>
$(document).ready(function () {
$(".readonly").attr("readonly", true);
$(".date").attr("readonly", true);
function convertFormToJSON(form) {
const array = $(form).serializeArray(); // Encodes form elements as an array of names and values.
const json = {};
$.each(array, function () {
// Handle multiple inputs with the same name (e.g., checkboxes)
if (json[this.name]) {
if (!json[this.name].push) {
json[this.name] = [json[this.name]];
}
json[this.name].push(this.value || '');
} else {
json[this.name] = this.value || '';
}
});
return json;
}
var submitType = 'POST';
$('form button[type="submit"]').on('click', function () {
// Remove any previously added hidden inputs for this form
if ($(this).val() === 'cancel') {
location.href = "${request.route_url('base-xhr-test')}";
return false;
} else if ($(this).val() === 'delete') {
submitType = 'DELETE';
}
});
$('form').on('submit', function (event) {
event.preventDefault();
console.log('Form submitted!');
var id = "${request.matchdict.get('id', '')}";
const formObject = convertFormToJSON(this);
if (id) {
formObject['id'] = id;
}
var message = '';
if (submitType == 'POST' && id) {
submitType = 'PUT';
message = 'Mengubah';
} else if (submitType === 'DELETE') {
message = 'Menghapus';
} else {
message = 'Menambah';
}
// Convert the JavaScript object to a JSON string
const formData = JSON.stringify(formObject);
var url = "${request.route_url('base-xhr-test-api')}";
$.ajax({
url: url,
type: submitType,
data: formData,
contentType: 'application/json',
success: function (response) {
console.log('Success:', response);
if (submitType === 'DELETE') {
location.href = "${request.route_url('base-xhr-test')}";
return;
}
data = response.data;
if (Array.isArray(data)) {
data = data[0];
}
id = data.id;
$("#success").html('Sukses: ' + message + ' data ID ' + id);
$("#success").show();
},
error: function (error) {
if (error.code === 404) {
location.href = "${request.route_url('base-xhr-test')}";
return;
}
console.error('Error:', error);
$("#errors").html('Error: ' + message + ' ' + error.responseText);
$("#errors").show();
}
});
});
${structure:scripts}
});
</script>
<div metal:define-slot="scripts"></div>
</div>
</html>
\ No newline at end of file
<html metal:use-macro="load: ./base3.1.pt" tal:define="
scripts scripts|scripts;
readonly readonly|readonly;">
scripts scripts|scripts;">
<div metal:fill-slot="content">
<div class="panel panel-default">
<div class="panel-heading">
<h3 class="panel-title"><i class="fa fa-fw fa-plus"></i>&nbsp;${request.title}</h3>
</div>
<div class="panel-body">
<div tal:content="structure form.render(readonly=readonly)"></div>
<div tal:content="structure form"></div>
</div>
</div>
</div>
......
import colander
from deform import (widget, )
from opensipkd.base.models import TestModel
from . import base_views, api_base
class AddSchema(colander.Schema):
kode = colander.SchemaNode(colander.String(),
title="Kode",
widget=widget.TextInputWidget())
nama = colander.SchemaNode(colander.String(),
title="Nama",
widget=widget.TextInputWidget())
description = colander.SchemaNode(colander.String(),
title="Description",
widget=widget.TextInputWidget())
id = colander.SchemaNode(colander.String(),
missing=colander.drop,
widget=widget.HiddenWidget())
class ListSchema(colander.Schema):
id = colander.SchemaNode(colander.Integer(), title="Action")
kode = colander.SchemaNode(colander.String(),
title="Kode",
widget=widget.TextInputWidget())
nama = colander.SchemaNode(colander.String(),
title="Nama",
widget=widget.TextInputWidget())
class Views(base_views.BaseView):
def __init__(self, request):
super().__init__(request)
self.list_schema = ListSchema
self.table = TestModel
self.add_schema = AddSchema
self.edit_schema = AddSchema
self.list_route = 'base-xhr-test'
class ViewsApi(api_base.ApiViews):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.table = TestModel
self.list_schema = ListSchema
self.add_schema = AddSchema
self.edit_schema = AddSchema
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!