Skip to content
Toggle navigation
Projects
Groups
Snippets
Help
aa.gusti
/
opensipkd-base
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Settings
Activity
Graph
Charts
Create a new issue
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit aaa076ad
authored
Sep 12, 2025
by
aa.gustiana@gmail.com
Browse Files
Options
Browse Files
Tag
Download
Email Patches
Plain Diff
Refactor API views and models for improved structure; add pagination support and…
… enhance error handling
1 parent
e182a3f7
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
600 additions
and
88 deletions
opensipkd/base/__init__.py
opensipkd/base/models/base.py
opensipkd/base/views/api_base.py
opensipkd/base/views/api_messages.py
opensipkd/base/views/restful_pagination/__init__.py
opensipkd/base/views/restful_pagination/pagenumber.py
pyproject.toml
opensipkd/base/__init__.py
View file @
aaa076a
...
...
@@ -5,18 +5,18 @@ import importlib
import
csv
import
re
import
datetime
import
deform
import
decimal
from
opensipkd.tools
import
get_settings
,
DefaultTimeZone
,
dmy
,
dmyhms
,
get_ext
import
deform
from
pkg_resources
import
resource_filename
from
pyramid.renderers
import
JSON
from
pyramid_beaker
import
session_factory_from_settings
from
pyramid.config
import
Configurator
from
pyramid.events
import
NewRequest
,
BeforeRender
,
subscriber
from
pyramid_mailer
import
mailer_factory_from_settings
from
sqlalchemy
import
engine_from_config
from
opensipkd.tools
import
get_settings
,
DefaultTimeZone
,
dmy
,
dmyhms
,
get_ext
from
.security
import
MySecurityPolicy
,
get_user
from
sqlalchemy
import
engine_from_config
from
.models.base
import
DBSession
from
.models.handlers
import
LogDBSession
from
.models.meta
import
Base
...
...
@@ -427,7 +427,7 @@ def _add_view_config(config, paket, route):
except
Exception
as
e
:
_logging
.
error
(
"Add View Config :{code} Kode {error}"
.
format
(
code
=
route
[
"kode"
],
error
=
str
(
e
)))
_logging
.
debug
(
f
"Route: {route.get('kode')} {route.get('path')}"
)
#
_logging.debug(f"Route: {route.get('kode')} {route.get('path')}")
class
BaseApp
():
...
...
opensipkd/base/models/base.py
View file @
aaa076a
...
...
@@ -82,6 +82,11 @@ class CommonModel(object):
class
DefaultModel
(
CommonModel
):
id
=
Column
(
Integer
,
primary_key
=
True
)
db_session
=
DBSession
def
__init__
(
self
):
super
()
.
__init__
()
self
.
db_session
=
DBSession
@classmethod
def
save
(
cls
,
values
,
row
=
None
,
**
kwargs
):
if
not
row
:
...
...
@@ -91,7 +96,7 @@ class DefaultModel(CommonModel):
@classmethod
def
count
(
cls
,
db_session
=
DBSession
):
return
db_session
.
query
(
func
.
count
(
'id'
))
.
scalar
()
return
self
.
db_session
.
query
(
func
.
count
(
'id'
))
.
scalar
()
@classmethod
def
query
(
cls
,
db_session
=
DBSession
,
filters
=
None
):
...
...
opensipkd/base/views/api_base.py
View file @
aaa076a
import
datetime
from
datetime
import
datetime
,
date
import
logging
from
decimal
import
Decimal
from
deform
import
Form
from
deform
import
Form
,
ValidationFailure
from
pyramid.response
import
Response
from
pyramid.exceptions
import
HTTPNotFound
from
pyramid.exceptions
import
HTTPNotFound
,
HTTPBadRequest
from
opensipkd.base.models
import
DBSession
from
opensipkd.tools.buttons
import
btn_save
,
btn_cancel
from
opensipkd.base.views.common
import
DataTables
,
ColumnDT
from
deform.widget
import
SelectWidget
import
colander
from
.
import
api_messages
from
..tools
import
obj2json
from
pyramid.response
import
Response
from
pyramid_restful.views
import
APIView
import
json
_log
=
logging
.
getLogger
(
__name__
)
class
ApiViews
(
APIView
):
def
__init__
(
self
,
**
kwargs
):
super
()
.
__init__
(
**
kwargs
)
self
.
req
uest
=
kwargs
.
get
(
"request"
,
None
)
self
.
req
=
kwargs
.
get
(
"request"
,
None
)
self
.
db_session
=
DBSession
self
.
buttons
=
(
btn_save
,
btn_cancel
)
self
.
table
=
None
self
.
pkey
=
(
"id"
)
self
.
pkey
=
(
"id"
,
)
self
.
orders
=
None
self
.
bindings
=
{}
self
.
id
=
-
1
self
.
autocomplete
=
True
self
.
form_widget
=
None
self
.
columns
=
None
self
.
list_schema
=
None
self
.
add_schema
=
None
self
.
edit_schema
=
None
self
.
psize
=
25
self
.
page
=
1
self
.
psize
=
25
def
list_join
(
self
,
query
,
**
kw
):
return
query
def
list_filter
(
self
,
query
,
**
kw
):
return
query
self
.
page
=
1
def
get_list
(
self
,
**
kwargs
):
"""
parameter
list_schema optional
list_join callback
list_filter callback
"""
url
=
[]
select_list
=
{}
list_schema
=
kwargs
.
get
(
"list_schema"
)
if
not
list_schema
:
list_schema
=
self
.
list_schema
and
self
.
list_schema
or
self
.
form_list
if
not
self
.
columns
:
columns
=
[]
for
d
in
list_schema
():
global_search
=
True
search_method
=
hasattr
(
d
,
"search_method"
)
\
and
getattr
(
d
,
"search_method"
)
or
"string_contains"
if
hasattr
(
d
,
"global_search"
):
if
d
.
global_search
==
False
:
global_search
=
False
if
hasattr
(
d
,
"field"
):
if
type
(
d
.
field
)
==
str
:
columns
.
append
(
ColumnDT
(
getattr
(
self
.
table
,
d
.
field
),
mData
=
d
.
name
,
global_search
=
global_search
,
search_method
=
search_method
))
else
:
columns
.
append
(
ColumnDT
(
d
.
field
,
mData
=
d
.
name
,
global_search
=
global_search
,
search_method
=
search_method
))
else
:
columns
.
append
(
ColumnDT
(
getattr
(
self
.
table
,
d
.
name
),
mData
=
d
.
name
,
global_search
=
global_search
,
search_method
=
search_method
))
if
hasattr
(
d
,
"widget"
):
if
d
.
widget
:
_log
.
debug
(
d
.
widget
)
if
type
(
d
.
widget
)
is
SelectWidget
:
select_list
[
d
.
name
]
=
d
.
widget
.
values
if
hasattr
(
d
,
"url"
):
url
.
append
(
d
.
name
)
else
:
columns
=
self
.
columns
# def __init__(self, request):
# self.request = request
# self.id = self.request.matchdict.get("id")
query
=
self
.
db_session
.
query
()
.
select_from
(
self
.
table
)
list_join
=
kwargs
.
get
(
'list_join'
)
if
list_join
is
not
None
:
query
=
list_join
(
query
,
**
kwargs
)
else
:
query
=
self
.
list_join
(
query
,
**
kwargs
)
if
self
.
req
.
user
and
self
.
req
.
user
.
company_id
and
hasattr
(
self
.
table
,
"company_id"
):
query
=
query
.
filter
(
self
.
table
.
company_id
==
self
.
req
.
user
.
company_id
)
list_filter
=
kwargs
.
get
(
'list_filter'
)
if
list_filter
is
not
None
:
query
=
list_filter
(
query
,
**
kwargs
)
else
:
query
=
self
.
list_filter
(
query
,
**
kwargs
)
# log.debug(str(columns))
# qry = query.add_columns(*[c.sqla_expr for c in columns])
# log.debug(str(qry))
row_table
=
DataTables
(
self
.
req
.
GET
,
query
,
columns
)
result
=
row_table
.
output_result
()
data
=
result
and
result
.
get
(
"data"
)
or
{}
for
res
in
data
:
for
k
in
res
:
if
k
in
select_list
.
keys
():
vals
=
select_list
[
k
]
for
r
in
vals
:
if
r
and
str
(
r
)
==
str
(
res
[
k
]):
res
[
k
]
=
vals
[
r
]
# for k, v in d.items():
# if k in url and v:
# link = "/".join([self.home, nik_url, v])
# d[k] =f'<a href="{link}" target="_blank">View</a>'
return
result
def
not_found
(
self
,
msg
=
None
):
if
not
msg
:
msg
=
api_messages
.
NOT_FOUND
return
Response
(
json
=
msg
,
status
=
404
)
def
obj2json
(
self
,
obj
):
return
obj2json
(
obj
)
...
...
@@ -47,6 +148,16 @@ class ApiViews(APIView):
def
form_validator
(
self
,
form
,
controls
):
"""Get Validator Form"""
def
form_validate
(
self
,
form
,
controls
):
"""Validate form"""
try
:
data
=
form
.
validate
(
controls
)
return
data
except
ValidationFailure
as
e
:
raise
HTTPBadRequest
(
explanation
=
str
(
e
.
error
.
asdict
()))
return
dict
(
data
)
def
get_form
(
self
,
class_form
,
row
=
None
,
buttons
=
(
btn_save
,
btn_cancel
),
**
kwargs
):
buttons
=
self
.
buttons
and
self
.
buttons
or
buttons
...
...
@@ -71,62 +182,74 @@ class ApiViews(APIView):
form_params
[
"widget"
]
=
self
.
form_widget
schema
=
class_form
(
**
form_params
)
schema
=
schema
.
bind
(
request
=
self
.
req
uest
,
**
bindings
)
schema
.
request
=
self
.
request
schema
=
schema
.
bind
(
request
=
self
.
req
,
**
bindings
)
schema
.
request
=
self
.
req
if
row
:
schema
.
deserialize
(
row
)
return
Form
(
schema
,
buttons
=
buttons
,
autocomplete
=
self
.
autocomplete
)
def
get_filters
(
self
,
query
,
**
kw
):
return
query
def
get_orders
(
self
,
query
,
**
kw
):
table
=
kw
.
get
(
"table"
,
self
.
table
)
if
not
self
.
orders
:
self
.
orders
=
self
.
pkey
query
=
query
.
order_by
(
*
(
getattr
(
table
,
k
)
for
k
in
self
.
orders
))
return
query
def
get_joins
(
self
,
query
,
**
kw
):
return
query
def
get_groups
(
self
,
query
,
**
kw
):
return
query
def
query
(
self
,
**
kw
):
self
.
psize
=
int
(
self
.
request
.
params
.
get
(
"size"
,
25
))
self
.
page
=
int
(
self
.
request
.
params
.
get
(
"page"
,
1
))
table
=
kw
.
get
(
"table"
,
self
.
table
)
get_filters
=
kw
.
get
(
"filters"
,
self
.
get_filters
)
get_joins
=
kw
.
get
(
"joins"
,
self
.
get_joins
)
get_groups
=
kw
.
get
(
"groups"
,
self
.
get_groups
)
get_orders
=
kw
.
get
(
"orders"
,
self
.
get_orders
)
query
=
self
.
db_session
.
query
(
table
)
query
=
get_joins
(
query
,
**
kw
)
query
=
get_groups
(
query
,
**
kw
)
query
=
get_filters
(
query
,
**
kw
)
query
=
get_orders
(
query
,
**
kw
)
query
=
query
.
limit
(
self
.
psize
)
.
offset
((
self
.
page
-
1
)
*
self
.
psize
)
return
query
def
query_id
(
self
,
**
kw
):
table
=
kw
.
get
(
"table"
,
self
.
table
)
get_joins
=
kw
.
get
(
"joins"
,
self
.
get_joins
)
get_groups
=
kw
.
get
(
"groups"
,
self
.
get_groups
)
get_filters
=
kw
.
get
(
"filters"
,
self
.
get_filters
)
get_orders
=
kw
.
get
(
"orders"
,
self
.
get_orders
)
if
hasattr
(
table
,
"query_id"
)
and
self
.
id
:
query
=
table
.
query_id
(
self
.
id
)
query
=
get_joins
(
query
,
**
kw
)
query
=
get_groups
(
query
,
**
kw
)
query
=
get_filters
(
query
,
**
kw
)
query
=
get_orders
(
query
,
**
kw
)
return
query
return
self
.
query
(
**
kw
)
# def get_filters(self, query, **kw):
# return query
# def get_orders(self, query, **kw):
# table = kw.get("table", self.table)
# if not self.orders:
# self.orders = self.pkey
# query = query.order_by(*(getattr(table, k) for k in self.orders))
# return query
# def get_joins(self, query, **kw):
# return query
# def get_groups(self, query, **kw):
# return query
# def query(self, **kw):
# self.psize = int(self.req.params.get("size", 25))
# self.page = int(self.req.params.get("page", 1))
# table = kw.get("table", self.table)
# get_filters = kw.get("filters", self.get_filters)
# get_joins = kw.get("joins", self.get_joins)
# get_groups = kw.get("groups", self.get_groups)
# get_orders = kw.get("orders", self.get_orders)
# query = self.db_session.query(table)
# query = get_joins(query, **kw)
# query = get_groups(query, **kw)
# query = get_filters(query, **kw)
# query = get_orders(query, **kw)
# query = query.limit(self.psize).offset((self.page - 1) * self.psize)
# return query
# def query_id(self, **kw):
# table = kw.get("table", self.table)
# get_joins = kw.get("joins", self.get_joins)
# get_groups = kw.get("groups", self.get_groups)
# get_filters = kw.get("filters", self.get_filters)
# get_orders = kw.get("orders", self.get_orders)
# if hasattr(table, "query_id") and self.id:
# query = table.query_id(self.id)
# query = get_joins(query, **kw)
# query = get_groups(query, **kw)
# query = get_filters(query, **kw)
# query = get_orders(query, **kw)
# return query
# return self.query(**kw)
def
json_adapter
(
self
,
obj
):
if
isinstance
(
obj
,
datetime
):
return
obj
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
)
elif
isinstance
(
obj
,
date
):
return
obj
.
strftime
(
'
%
Y-
%
m-
%
d'
)
elif
isinstance
(
obj
,
Decimal
):
return
float
(
obj
)
elif
isinstance
(
obj
,
colander
.
_null
):
return
None
else
:
return
obj
def
success
(
self
,
data
,
msg
=
None
):
"""
...
...
@@ -141,26 +264,46 @@ class ApiViews(APIView):
data
.
update
(
msg
)
return
data
# def get(self, self.req. *args, **kwargs):
# self.req = request
# query=self.query()
# if not query.first():
# return HTTPNotFound()
# data = []
# for row in query:
# d = dict(row.__dict__)
# d.pop('_sa_instance_state', None)
# for key, value in d.items():
# if isinstance(value, datetime.datetime):
# d[key] = value.isoformat()
# elif isinstance(value, Decimal):
# d[key] = float(value)
# data.append(d)
# return Response(json=self.success(data=data))
def
_get
(
self
,
request
,
*
args
,
**
kwargs
):
self
.
req
=
request
if
"draw"
not
in
self
.
req
.
params
:
self
.
req
.
GET
.
add
(
'draw'
,
"1"
)
if
"length"
not
in
self
.
req
.
params
:
self
.
req
.
GET
.
add
(
'length'
,
"10"
)
if
"start"
not
in
self
.
req
.
params
:
self
.
req
.
GET
.
add
(
'start'
,
"0"
)
rec_id
=
self
.
req
.
matchdict
.
get
(
'id'
)
if
rec_id
:
self
.
req
.
GET
.
add
(
'columns[0][search][value]'
,
rec_id
)
return
self
.
get_list
(
**
kwargs
)
def
get_custom_render
(
self
,
data
):
return
data
def
get
(
self
,
request
,
*
args
,
**
kwargs
):
self
.
request
=
request
query
=
self
.
query
()
if
not
query
.
first
():
return
HTTPNotFound
()
data
=
[]
for
row
in
query
:
d
=
dict
(
row
.
__dict__
)
d
.
pop
(
'_sa_instance_state'
,
None
)
for
key
,
value
in
d
.
items
():
if
isinstance
(
value
,
datetime
.
datetime
):
d
[
key
]
=
value
.
isoformat
()
elif
isinstance
(
value
,
Decimal
):
d
[
key
]
=
float
(
value
)
data
.
append
(
d
)
return
Response
(
json
=
self
.
success
(
data
=
data
))
d
=
self
.
_get
(
request
,
*
args
,
**
kwargs
)
d
=
self
.
get_custom_render
(
d
)
return
Response
(
json
=
json
.
loads
(
json
.
dumps
(
d
,
default
=
self
.
json_adapter
)))
def
post
(
self
,
request
,
*
args
,
**
kwargs
):
self
.
req
uest
=
request
return
self
.
req
uest
self
.
req
=
request
return
self
.
req
def
delete
(
self
):
query
=
self
.
db_session
.
query
(
self
.
table
)
...
...
@@ -169,13 +312,32 @@ class ApiViews(APIView):
if
not
row
:
return
HTTPNotFound
()
return
Response
(
json
=
self
.
success
())
def
put
(
self
,
data
):
self
.
req
uest
=
data
return
self
.
req
uest
self
.
req
=
data
return
self
.
req
def
patch
(
self
,
data
):
self
.
request
=
data
return
self
.
request
self
.
req
=
data
return
self
.
req
def
save
(
self
,
data
,
user
=
None
,
obj
=
None
):
if
not
user
:
user
=
self
.
req
.
session
.
user
if
not
obj
:
obj
=
self
.
table
()
obj
.
create_date
=
datetime
.
now
()
obj
.
create_uid
=
user
and
user
.
id
or
None
obj
.
enabled
=
1
else
:
obj
.
write_date
=
datetime
.
now
()
obj
.
write_uid
=
user
and
user
.
id
or
None
for
key
,
value
in
data
.
items
():
if
hasattr
(
obj
,
key
):
setattr
(
obj
,
key
,
value
)
self
.
db_session
.
add
(
obj
)
self
.
db_session
.
flush
()
return
obj
opensipkd/base/views/api_messages.py
View file @
aaa076a
from
http.client
import
NOT_FOUND
SUCCESS
=
{
"error"
:
{
"code"
:
"0000"
,
...
...
@@ -18,3 +21,10 @@ PAYMENTMISMATCH = {
"msg"
:
"Jumlah pembayaran tidak sesuai"
}
}
NOT_FOUND
=
{
"error"
:
{
"code"
:
"404"
,
"msg"
:
"Data Tidak Ditemukan"
}
}
opensipkd/base/views/restful_pagination/__init__.py
0 → 100644
View file @
aaa076a
# from pyramid_restful.pagination.base import BasePagination
# from .pagenumber import PageNumberPagination
# from pyramid_restful.pagination.linkheader import LinkHeaderPagination
opensipkd/base/views/restful_pagination/pagenumber.py
0 → 100644
View file @
aaa076a
# from typing import Sequence
# import warnings
# import six
# from math import ceil
# from collections import OrderedDict
# from sqlalchemy.orm.query import Query
# from pyramid.exceptions import HTTPNotFound
# from pyramid.response import Response
# from pyramid_restful.settings import api_settings
# from pyramid_restful.pagination.utilities import remove_query_param, replace_query_param
# from pyramid_restful.pagination.base import BasePagination
# __all__ = ['PageNumberPagination']
# def _positive_int(integer_string, strict=False, cutoff=None):
# """
# Cast a string to a strictly positive integer.
# """
# ret = int(integer_string)
# if ret < 0 or (ret == 0 and strict):
# raise ValueError()
# if cutoff:
# ret = min(ret, cutoff)
# return ret
# class InvalidPage(Exception):
# pass
# class PageNotAnInteger(InvalidPage):
# pass
# class EmptyPage(InvalidPage):
# pass
# class UnorderedObjectListWarning(RuntimeWarning):
# pass
# class Paginator:
# def __init__(self, object_list, per_page, orphans=0, allow_empty_first_page=True):
# self.object_list = object_list
# self._check_object_list_is_ordered()
# self.per_page = int(per_page)
# self.orphans = int(orphans)
# self.allow_empty_first_page = allow_empty_first_page
# def validate_number(self, number):
# """
# Validates the given 1-based page number.
# """
# try:
# number = int(number)
# except (TypeError, ValueError):
# raise PageNotAnInteger('That page number is not an integer')
# if number < 1:
# raise EmptyPage('That page number is less than 1')
# if number > self.num_pages:
# if number == 1 and self.allow_empty_first_page:
# pass
# else:
# raise EmptyPage('That page contains no results')
# return number
# def page(self, number):
# """
# Returns a Page object for the given 1-based page number.
# """
# number = self.validate_number(number)
# bottom = (number - 1) * self.per_page
# top = bottom + self.per_page
# if top + self.orphans >= self.count:
# top = self.count
# return self._get_page(self.object_list[bottom:top], number, self)
# def _get_page(self, *args, **kwargs):
# """
# Returns an instance of a single page.
# This hook can be used by subclasses to use an alternative to the
# standard :cls:`Page` object.
# """
# return Page(*args, **kwargs)
# @property
# def count(self):
# """
# Returns the total number of objects, across all pages.
# """
# try:
# return self.object_list.count()
# except (AttributeError, TypeError):
# # AttributeError if object_list has no count() method.
# # TypeError if object_list.count() requires arguments
# # (i.e. is of type list).
# return len(self.object_list)
# @property
# def num_pages(self):
# """
# Returns the total number of pages.
# """
# if self.count == 0 and not self.allow_empty_first_page:
# return 0
# hits = max(1, self.count - self.orphans)
# return int(ceil(hits / float(self.per_page)))
# @property
# def page_range(self):
# """
# Returns a 1-based range of pages for iterating through within
# a template for loop.
# """
# return range(1, self.num_pages + 1)
# def _check_object_list_is_ordered(self):
# """
# Warn if self.object_list is unordered (typically a QuerySet).
# """
# if hasattr(self.object_list, 'ordered') and not self.object_list.ordered:
# warnings.warn(
# 'Pagination may yield inconsistent results with an unordered '
# 'object_list: {!r}'.format(self.object_list),
# UnorderedObjectListWarning
# )
# class Page(Sequence):
# def __init__(self, object_list, number, paginator):
# self.object_list = object_list
# self.number = number
# self.paginator = paginator
# def __repr__(self):
# return '<Page %s of %s>' % (self.number, self.paginator.num_pages)
# def __len__(self):
# return len(self.object_list)
# def __getitem__(self, index):
# if not isinstance(index, (int, slice)):
# raise TypeError
# # The object_list is converted to a list so that if it was a QuerySet
# # it won't be a database hit per __getitem__.
# if not isinstance(self.object_list, list):
# self.object_list = list(self.object_list)
# return self.object_list[index]
# def has_next(self):
# return self.number < self.paginator.num_pages
# def has_previous(self):
# return self.number > 1
# def has_other_pages(self):
# return self.has_previous() or self.has_next()
# def next_page_number(self):
# return self.paginator.validate_number(self.number + 1)
# def previous_page_number(self):
# return self.paginator.validate_number(self.number - 1)
# def start_index(self):
# """
# Returns the 1-based index of the first object on this page,
# relative to total objects in the paginator.
# """
# # Special case, return zero if no items.
# if self.paginator.count == 0:
# return 0
# return (self.paginator.per_page * (self.number - 1)) + 1
# def end_index(self):
# """
# Returns the 1-based index of the last object on this page,
# relative to total objects found (hits).
# """
# # Special case for the last page because there can be orphans.
# if self.number == self.paginator.num_pages:
# return self.paginator.count
# return self.number * self.paginator.per_page
# class PageNumberPagination(BasePagination):
# """
# A simple page number based style that supports page numbers as query parameters.
# For example::
# http://api.example.org/accounts/?page=4
# http://api.example.org/accounts/?page=4&page_size=100
# page_size can be overridden as class attribute::
# class MyPager(PageNumberPagination):
# page_size = 10
# The resulting response JSON has four attributes, count, next, previous and results. Count indicates the
# total number of objects before pagination. Next and previous contain URLs that can be used to retrieve the next
# and previous pages of date respectively. The results attribute contains the list of objects that belong to page
# of data.
# Example::
# {
# 'count': 50,
# 'next': 'app.myapp.com/api/users?page=3',
# 'previous': 'app.myapp.com/api/users?page=1',
# 'results': [
# {id: 4, 'email': 'user4@myapp.com', 'name': 'John Doe'},
# {id: 5, 'email': 'user5@myapp.com', 'name': 'Jan Doe'}
# ]
# }
# """
# page_size = api_settings.page_size
# paginator_class = Paginator
# # Client can control the page using this query parameter.
# page_query_param = 'page'
# # Client can control the page size using this query parameter.
# # Default is 'None'. Set to eg 'page_size' to enable usage.
# page_size_query_param = None
# # Set to an integer to limit the maximum page size the client may request.
# # Only relevant if 'page_size_query_param' has also been set.
# max_page_size = None
# last_page_strings = ('last',)
# invalid_page_message = 'Invalid page "{page_number}": {message}.'
# def paginate_query(self, query, request):
# self.request = request
# # Force the execution of the query, so we don't make unnecessary db calls
# data = query.all() if isinstance(query, Query) else query
# page_size = self.get_page_size(request)
# if not page_size:
# return None
# page_number = request.params.get(self.page_query_param, 1)
# paginator = self.paginator_class(data, page_size)
# if page_number in self.last_page_strings:
# page_number = paginator.num_pages
# try:
# self.page = paginator.page(page_number)
# except InvalidPage as exc:
# msg = self.invalid_page_message.format(
# page_number=page_number, message=six.text_type(exc)
# )
# raise HTTPNotFound(msg)
# return list(self.page)
# def get_paginated_response(self, data):
# return Response(json=OrderedDict([
# ('count', self.page.paginator.count),
# ('next', self.get_next_link()),
# ('previous', self.get_previous_link()),
# ('results', data)
# ]))
# def get_page_size(self, request):
# if self.page_size_query_param:
# try:
# return _positive_int(
# request.params[self.page_size_query_param],
# strict=True,
# cutoff=self.max_page_size
# )
# except (KeyError, ValueError):
# pass
# return self.page_size
# def get_next_link(self):
# if not self.page.has_next():
# return None
# url = self.get_url_root()
# page_number = self.page.next_page_number()
# return replace_query_param(url, self.page_query_param, page_number)
# def get_previous_link(self):
# if not self.page.has_previous():
# return None
# url = self.get_url_root()
# page_number = self.page.previous_page_number()
# if page_number == 1:
# return remove_query_param(url, self.page_query_param)
# return replace_query_param(url, self.page_query_param, page_number)
# def get_url_root(self):
# """
# Override this if you need a different root url.
# For example if the app is behind a reverse proxy and you
# want to use the original host in the X-Forwarded-Host header.
# """
# return self.request.current_route_url()
pyproject.toml
View file @
aaa076a
...
...
@@ -25,6 +25,7 @@ dependencies = [
"pyramid_mailer"
,
'pyramid_chameleon'
,
'pyramid_rpc'
,
"pyramid-restful-framework"
'pytz'
,
'psycopg
2
-binary'
,
'requests'
,
...
...
Write
Preview
Markdown
is supported
Attach a file
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to post a comment