Skip to content
Toggle navigation
Projects
Groups
Snippets
Help
aa.gusti
/
opensipkd-base
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Settings
Activity
Graph
Charts
Create a new issue
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit 1629d3c5
authored
Mar 11, 2025
by
aa.gustiana@gmail.com
Browse Files
Options
Browse Files
Tag
Download
Email Patches
Plain Diff
perubahan dari view_upload ke view_import upload dan import
1 parent
2202f042
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
174 additions
and
97 deletions
opensipkd/base/__init__.py
opensipkd/base/views/base_views.py
pyproject.toml
opensipkd/base/__init__.py
View file @
1629d3c
...
...
@@ -5,10 +5,7 @@ import locale
import
logging
from
sqlalchemy
import
or_
import
re
from
.routes
import
routes
try
:
from
urllib
import
(
urlencode
,
quote
,
quote_plus
,
)
except
ImportError
:
...
...
@@ -156,11 +153,10 @@ def add_global(event):
event
[
'get_params'
]
=
get_params
event
[
'get_urls'
]
=
get_urls
event
[
'get_csrf_token'
]
=
get_csrf_token
event
[
'get_base_menus'
]
=
BASE_CLASS
.
get_menus
()
# event['get_params'] = get_params
# event['get_module_menus'] = get_module_menus
# event['get_module_submenus'] = get_module_submenus
event
[
'get_base_menus'
]
=
BASE_CLASS
.
get_menus
()
# def get_params(request, params, alternate=None, settings=None):
# return get_params(params, alternate, settings)
...
...
@@ -223,7 +219,7 @@ def get_ini_params(request, params=None, alternate=None, settings=None):
def
get_id_card_folder
(
ext
=
None
,
settings
=
None
):
_logging
.
debug
(
'get_id_card_folder'
)
idcard_files
=
get_params
(
"partner_idcard_folder"
,
'/tmp/idcard'
,
settings
=
settings
)
'/tmp/idcard
/
'
,
settings
=
settings
)
if
ext
:
idcard_files
+=
ext
# if ext and os.sep != '/':
...
...
opensipkd/base/views/base_views.py
View file @
1629d3c
...
...
@@ -42,6 +42,75 @@ class BaseView(object):
self
.
db_session
=
DBSession
self
.
params
=
self
.
req
.
params
self
.
settings
=
get_settings
()
self
.
tahun
=
None
self
.
bulan
=
None
self
.
posted
=
False
self
.
awal
=
None
self
.
akhir
=
None
self
.
dt_awal
=
None
self
.
dt_akhir
=
None
self
.
tahun_awal
=
None
self
.
tahun_akhir
=
None
self
.
departemen_kd
=
None
self
.
departemen_nm
=
None
self
.
departemen_id
=
None
self
.
jenis
=
None
self
.
list_route
=
'home'
self
.
list_col_defs
=
""
self
.
list_cols
=
""
self
.
list_report
=
(
btn_csv
,
btn_pdf
)
self
.
list_buttons
=
(
btn_add
,
btn_close
)
self
.
columns
=
None
self
.
form_params
=
dict
(
scripts
=
""
)
self
.
list_url
=
""
self
.
list_route
=
''
self
.
allow_view
=
True
self
.
allow_edit
=
True
self
.
allow_delete
=
True
self
.
allow_post
=
False
self
.
allow_unpost
=
False
self
.
allow_check
=
False
self
.
check_field
=
""
self
.
state_save
=
False
self
.
server_side
=
True
self
.
scroll_y
=
False
self
.
scroll_x
=
False
self
.
list_form
=
None
self
.
form_list
=
None
self
.
filter_columns
=
False
self
.
form_scripts
=
"""
$('#parent_nm').bind('typeahead:selected', function(obj, datum) {
$('#parent_id').val(datum.id);
$('#parent_kd').val(datum.kode);
});"""
self
.
form_widget
=
None
self
.
list_schema
=
colander
.
Schema
()
self
.
add_schema
=
colander
.
Schema
()
self
.
edit_schema
=
colander
.
Schema
()
self
.
upload_schema
=
UploadSchema
self
.
upload_exts
=
(
".csv"
,
".tsv"
)
self
.
upload_keys
=
[
"kode"
]
self
.
table
=
Table
self
.
home
=
self
.
req
.
_host
self
.
buttons
=
None
self
.
headers
=
None
self
.
bindings
=
{}
self
.
autocomplete
=
'on'
self
.
action_suffix
=
"/grid/act"
self
.
report_file
=
""
self
.
new_buttons
=
{}
self
.
is_object
=
False
self
.
html_buttons
=
{}
self
.
init_session
(
request
)
def
init_session
(
self
,
request
):
# if not request.user:
if
"g_state"
in
request
.
cookies
:
request
.
response
.
delete_cookie
(
"g_state"
,
'/'
)
...
...
@@ -130,70 +199,21 @@ class BaseView(object):
self
.
jenis
=
'jenis'
in
self
.
params
and
self
.
params
[
'jenis'
]
or
self
.
jenis
self
.
ses
[
'jenis'
]
=
self
.
jenis
self
.
list_route
=
'home'
self
.
list_col_defs
=
""
self
.
list_cols
=
""
# self.list_buttons = 'btn_view, btn_add, btn_edit, btn_delete, ' \
# 'btn_close'
self
.
list_report
=
(
btn_csv
,
btn_pdf
)
# self.list_buttons = (btn_view, btn_add, btn_edit, btn_delete, btn_close)
self
.
list_buttons
=
(
btn_add
,
btn_close
)
self
.
columns
=
None
self
.
form_params
=
dict
(
scripts
=
""
)
self
.
list_url
=
""
self
.
list_route
=
''
self
.
list_schema
=
colander
.
Schema
self
.
allow_view
=
True
self
.
allow_edit
=
True
self
.
allow_delete
=
True
self
.
allow_post
=
False
self
.
allow_unpost
=
False
self
.
allow_check
=
False
self
.
check_field
=
""
self
.
state_save
=
False
self
.
server_side
=
True
self
.
scroll_y
=
False
self
.
scroll_x
=
False
self
.
list_form
=
None
self
.
form_list
=
None
self
.
filter_columns
=
False
self
.
form_scripts
=
"""
$('#parent_nm').bind('typeahead:selected', function(obj, datum) {
$('#parent_id').val(datum.id);
$('#parent_kd').val(datum.kode);
});"""
self
.
form_widget
=
None
self
.
edit_schema
=
colander
.
Schema
()
self
.
add_schema
=
colander
.
Schema
()
self
.
upload_schema
=
UploadSchema
self
.
upload_exts
=
(
".csv"
,
".tsv"
)
self
.
upload_keys
=
[
"kode"
]
self
.
table
=
Table
self
.
home
=
self
.
req
.
_host
self
.
buttons
=
None
self
.
headers
=
None
self
.
bindings
=
{}
self
.
autocomplete
=
'on'
self
.
action_suffix
=
"/grid/act"
self
.
report_file
=
""
self
.
new_buttons
=
{}
self
.
is_object
=
False
self
.
html_buttons
=
{}
def
query_register
(
self
,
**
kwargs
):
pass
def
get_routes
(
self
):
"""
Digunakan untuk mendapatkan default url apab
u
la list_url tidak ada
Digunakan untuk mendapatkan default url apab
i
la list_url tidak ada
"""
def
route_list
(
self
,
**
kwargs
):
"""
Digubakan untk mengalihkan proses setelah add edit view delete data
Default akan di arahkan ke list-route untuk merubah default direction
"""
msg
=
kwargs
.
get
(
"msg"
)
error
=
kwargs
.
get
(
"error"
,
""
)
list_url
=
kwargs
.
get
(
"list_url"
,
self
.
get_routes
())
...
...
@@ -202,7 +222,9 @@ class BaseView(object):
if
not
list_url
:
list_url
=
self
.
req
.
route_url
(
self
.
list_route
,
**
kwargs
)
log
.
error
(
list_url
)
log
.
debug
(
list_url
)
if
self
.
headers
:
return
HTTPFound
(
location
=
get_urls
(
list_url
),
headers
=
self
.
headers
)
...
...
@@ -632,63 +654,120 @@ class BaseView(object):
return
self
.
view_view
(
buttons
=
buttons
)
def
view_upload
(
self
,
**
kw
):
return
self
.
view_import
(
self
,
**
kw
)
# exts = kw.get("exts")
# table = None
# if not exts:
# exts = self.upload_exts
# delimiter = kw.get("delimiter")
# bindings = self.get_bindings()
# form = self.get_form(self.upload_schema, bindings=bindings)
# resources = form.get_widget_resources()
# if self.req.POST:
# if 'save' in self.req.POST:
# input_file = self.req.POST['upload'].file
# filename = self.req.POST['upload'].filename.lower()
# ext = get_ext(filename).lower()
# if ext.lower() not in exts:
# ext = ", ".join([ex for ex in exts])
# self.req.session.flash(f'File harus format {ext}', 'error')
# return self.returned_form(form, table, **kw)
# # return dict(form=form.render(),
# # scripts=self.form_scripts, css=resources["css"],
# # js=resources["js"])
# _here = get_params('tmp', '/tmp')
# folder = os.path.join(_here, 'upload')
# if not os.path.exists(folder):
# os.makedirs(folder)
# fullpath = os.path.join(folder, filename)
# output_file = open(fullpath, 'wb')
# input_file.seek(0)
# while True:
# data = input_file.read(2 << 16)
# if not data:
# break
# output_file.write(data)
# output_file.close()
# try:
# self.save_upload(fullpath, **kw)
# except Exception as e:
# self.req.session.flash(str(e), 'error')
# return dict(form=form.render(),
# scripts=self.form_scripts, css=resources["css"],
# js=resources["js"])
# elif "cancel" in self.req.POST or 'batal' in self.req.POST or "close" in self.req.POST:
# self.cancel_act()
# return self.route_list()
# return self.returned_form(form, table, **kw)
# return dict(form=form.render(),
# scripts=self.form_scripts, css=resources["css"],
# js=resources["js"])
def
view_import
(
self
,
**
kw
):
exts
=
kw
.
get
(
"exts"
)
table
=
None
if
not
exts
:
exts
=
self
.
upload_exts
from
opensipkd.tools
import
Upload
delimiter
=
kw
.
get
(
"delimiter"
)
bindings
=
self
.
get_bindings
()
form
=
self
.
get_form
(
self
.
upload_schema
,
bindings
=
bindings
)
resources
=
form
.
get_widget_resources
()
if
self
.
req
.
POST
:
if
'save'
in
self
.
req
.
POST
:
input_file
=
self
.
req
.
POST
[
'upload'
]
.
file
filename
=
self
.
req
.
POST
[
'upload'
]
.
filename
.
lower
()
ext
=
get_ext
(
filename
)
.
lower
()
if
ext
.
lower
()
not
in
exts
:
ext
=
", "
.
join
([
ex
for
ex
in
exts
])
self
.
req
.
session
.
flash
(
f
'File harus format {ext}'
,
'error'
)
return
dict
(
form
=
form
.
render
(),
scripts
=
self
.
form_scripts
,
css
=
resources
[
"css"
],
js
=
resources
[
"js"
])
_here
=
get_params
(
'tmp'
,
'/tmp'
)
folder
=
os
.
path
.
join
(
_here
,
'
upload
'
)
if
not
os
.
path
.
exists
(
folder
):
os
.
makedirs
(
folder
)
folder
=
os
.
path
.
join
(
_here
,
'
import
'
)
#
if not os.path.exists(folder):
#
os.makedirs(folder)
fullpath
=
os
.
path
.
join
(
folder
,
filename
)
output_file
=
open
(
fullpath
,
'wb'
)
input_file
.
seek
(
0
)
while
True
:
data
=
input_file
.
read
(
2
<<
16
)
if
not
data
:
break
output_file
.
write
(
data
)
output_file
.
close
()
upload
=
Upload
(
folder
)
try
:
file_name
=
upload
.
save
(
self
.
req
,
"upload"
,
exts
)
except
:
self
.
ses
.
flash
(
f
'File harus format {exts}'
,
'error'
)
return
self
.
returned_form
(
form
,
table
,
**
kw
)
# input_file = self.req.POST['upload'].file
# filename = self.req.POST['upload'].filename.lower()
# ext = get_ext(filename).lower()
# if ext.lower() not in exts:
# ext = ", ".join([ex for ex in exts])
# self.req.session.flash(f'File harus format {ext}', 'error')
# return self.returned_form(form, table, **kw)
# fullpath = os.path.join(folder, filename)
# output_file = open(fullpath, 'wb')
# input_file.seek(0)
# while True:
# data = input_file.read(2 << 16)
# if not data:
# break
# output_file.write(data)
# output_file.close()
fullpath
=
os
.
path
.
join
(
folder
,
file_name
)
try
:
self
.
save_upload
(
fullpath
,
**
kw
)
except
Exception
as
e
:
self
.
req
.
session
.
flash
(
str
(
e
),
'error'
)
return
dict
(
form
=
form
.
render
(),
scripts
=
self
.
form_scripts
,
css
=
resources
[
"css"
],
js
=
resources
[
"js"
])
return
self
.
returned_form
(
form
,
table
,
**
kw
)
elif
"cancel"
in
self
.
req
.
POST
or
'batal'
in
self
.
req
.
POST
or
"close"
in
self
.
req
.
POST
:
self
.
cancel_act
()
return
self
.
route_list
()
return
dict
(
form
=
form
.
render
(),
scripts
=
self
.
form_scripts
,
css
=
resources
[
"css"
],
js
=
resources
[
"js"
])
return
self
.
returned_form
(
form
,
table
,
**
kw
)
def
get_file
(
self
,
filename
):
return
open
(
filename
)
def
save_upload
(
self
,
file_name
,
**
args
):
return
append_csv
(
self
.
table
,
file_name
,
self
.
upload_keys
,
get_file_func
=
self
.
get_file
,
update_exist
=
True
,
**
args
)
def
before_add
(
self
):
return
{}
...
...
@@ -1046,6 +1125,7 @@ class BaseView(object):
return
value
[
"filename"
]
@colander.deferred
def
deferred_status
(
node
,
kw
):
values
=
kw
.
get
(
'daftar_status'
,
[])
...
...
pyproject.toml
View file @
1629d3c
...
...
@@ -2,6 +2,9 @@
requires
=
[
'setuptools
>=
64
'
,
'wheel'
]
build-backend
=
'setuptools.build_meta'
[tool.hatch.build]
dev-mode-dirs
=
["."]
[tool.mypy]
exclude
=
[
'base'
,
...
...
@@ -12,14 +15,12 @@ exclude = [
show_error_codes
=
true
[[tool.mypy.overrides]]
module
=
[
"opensipkd.base.*"
,
"opensipkd.models.*"
,
"opensipkd.detable.*"
,
"opensipkd.jsonrpc_auth.*"
,
]
warn_unused_ignores
=
true
strict
=
true
...
...
Write
Preview
Markdown
is supported
Attach a file
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to post a comment