Skip to content
Toggle navigation
Projects
Groups
Snippets
Help
aa.gusti
/
opensipkd-tools
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Settings
Activity
Graph
Charts
Create a new issue
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit b0a3cd23
authored
Dec 19, 2022
by
aagusti
Browse Files
Options
Browse Files
Tag
Download
Plain Diff
perbaikan upload
2 parents
ed268c24
ac4770c0
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
2070 additions
and
1034 deletions
opensipkd/tools/__init__.py
opensipkd/tools/tools.init
opensipkd/tools/__init__.py
View file @
b0a3cd2
from
__future__
import
print_function
import
os
import
re
import
mimetypes
import
csv
import
calendar
import
datetime
from
random
import
choice
from
string
import
(
ascii_uppercase
,
ascii_lowercase
,
digits
,
)
import
locale
import
colander
import
pytz
import
io
from
pyramid.httpexceptions
import
HTTPNotFound
from
pyramid.threadlocal
import
get_current_registry
from
json
import
JSONEncoder
import
logging
log
=
logging
.
getLogger
(
__name__
)
################
# Phone number #
################
MSISDN_ALLOW_CHARS
=
map
(
lambda
x
:
str
(
x
),
range
(
10
))
# + ['+']
def
get_msisdn
(
msisdn
,
country
=
'+62'
):
"""
Digunakan untuk pengecekan no telp suatu negara
:param msisdn:
:param country:
:return: output kode negara+no msisdn
"""
# for ch in msisdn:
# if ch not in MSISDN_ALLOW_CHARS:
# raise Exception(f'{ch} not in {print(x for x in range(10))}')
try
:
i
=
int
(
msisdn
)
except
ValueError
as
e
:
log
.
info
(
e
)
return
if
not
i
:
raise
Exception
(
'MSISDN must filled'
)
if
len
(
str
(
i
))
<
7
:
raise
Exception
(
'MSISDN less then 7 number'
)
if
re
.
compile
(
r'^\+'
)
.
search
(
msisdn
):
return
msisdn
if
re
.
compile
(
r'^0'
)
.
search
(
msisdn
):
return
'
%
s
%
s'
%
(
country
,
msisdn
.
lstrip
(
'0'
))
################
# Money format #
################
def
should_int
(
value
):
int_
=
int
(
value
)
if
int_
==
value
:
return
int_
return
value
def
thousand
(
value
,
float_count
=
None
):
"""
Memisahkan ribuan
:param value:
:param float_count:
:return: string
"""
if
not
value
:
return
"0"
if
float_count
is
None
:
# autodetection
if
type
(
value
)
in
(
int
,
float
):
float_count
=
0
else
:
float_count
=
2
return
locale
.
format_string
(
'
%%
.
%
df'
%
float_count
,
value
,
True
)
def
money
(
value
,
float_count
=
None
,
currency
=
None
):
"""
Memisahkan ribuan dan menambahkan nama mata uang
:param value:
:param float_count:
:param currency:
:return:
"""
if
value
<
0
:
v
=
abs
(
value
)
format_
=
'(
%
s)'
else
:
v
=
value
format_
=
'
%
s'
if
currency
is
None
:
currency
=
locale
.
localeconv
()[
'currency_symbol'
]
s
=
' '
.
join
([
currency
,
thousand
(
v
,
float_count
)])
return
format_
%
s
def
round_up
(
n
):
i
=
int
(
n
)
if
n
==
i
:
return
i
if
n
>
i
:
return
i
+
1
return
i
-
1
###########
# Pyramid #
###########
def
get_settings
():
return
get_current_registry
()
.
settings
def
get_params
(
params
,
alternate
=
None
):
"""
Digunakan untuk mengambil nilai dari konfigurasi sesuai params yang disebut
:param params: variable
:param alternate: default apabila tidak ditemukan data/params
:return: value
contoh penggunaan:
get_params('devel', False)
"""
settings
=
get_settings
()
result
=
settings
and
params
in
settings
and
settings
[
params
]
.
strip
()
or
None
return
result
and
result
or
alternate
def
devel
():
settings
=
get_settings
()
is_devel
=
'devel'
in
settings
and
settings
[
'devel'
]
and
True
or
False
print
(
"DEBUG>>"
,
is_devel
)
return
is_devel
def
get_timezone
():
settings
=
get_settings
()
return
pytz
.
timezone
(
settings
[
'timezone'
])
def
get_modules
():
settings
=
get_settings
()
return
settings
[
'modules'
]
.
split
(
','
)
########
# Time #
########
one_second
=
datetime
.
timedelta
(
1.0
/
24
/
60
/
60
)
DateType
=
type
(
datetime
.
date
.
today
())
DateTimeType
=
type
(
datetime
.
datetime
.
now
())
TimeZoneFile
=
'/etc/timezone'
if
os
.
path
.
exists
(
TimeZoneFile
):
DefaultTimeZone
=
open
(
TimeZoneFile
)
.
read
()
.
strip
()
else
:
DefaultTimeZone
=
'Asia/Jakarta'
def
as_timezone
(
tz_date
):
localtz
=
get_timezone
()
if
not
tz_date
.
tzinfo
:
tz_date
=
create_datetime
(
tz_date
.
year
,
tz_date
.
month
,
tz_date
.
day
,
tz_date
.
hour
,
tz_date
.
minute
,
tz_date
.
second
,
tz_date
.
microsecond
)
return
tz_date
.
astimezone
(
localtz
)
def
create_datetime
(
year
,
month
,
day
,
hour
=
0
,
minute
=
7
,
second
=
0
,
microsecond
=
0
):
tz
=
get_timezone
()
dt
=
datetime
.
datetime
(
year
,
month
,
day
,
hour
,
minute
,
second
,
microsecond
)
return
tz
.
localize
(
dt
)
def
create_date
(
year
,
month
,
day
):
return
create_datetime
(
year
,
month
,
day
)
def
create_now
():
tz
=
get_timezone
()
return
datetime
.
datetime
.
now
(
tz
)
def
date_from_str
(
value
):
separator
=
None
value
=
value
.
split
()[
0
]
# dd-mm-yyyy HH:MM:SS
for
s
in
[
'-'
,
'/'
,
'.'
]:
if
value
.
find
(
s
)
>
-
1
:
separator
=
s
break
if
separator
:
t
=
list
(
map
(
lambda
x
:
int
(
x
),
value
.
split
(
separator
)))
y
,
m
,
d
=
t
[
2
],
t
[
1
],
t
[
0
]
if
d
>
999
:
# yyyy-mm-dd
y
,
d
=
d
,
y
else
:
y
,
m
,
d
=
int
(
value
[:
4
]),
int
(
value
[
4
:
6
]),
int
(
value
[
6
:])
return
datetime
.
date
(
y
,
m
,
d
)
def
time_from_str
(
value
):
# separator = ":"
value
=
value
.
split
()[
1
]
# dd-mm-yyyy HH:MM:SS
# return value.strptime('%H:%M:%S')
h
,
m
,
s
=
value
.
split
(
":"
)
# return datetime.time(h, m, s)
return
datetime
.
timedelta
(
hours
=
int
(
h
),
minutes
=
int
(
m
),
seconds
=
int
(
s
))
def
datetime_from_str
(
value
):
# separator = None
dt
=
date_from_str
(
value
)
tm
=
time_from_str
(
value
)
return
datetime
.
datetime
(
dt
.
year
,
dt
.
month
,
dt
.
day
)
+
tm
def
dmy
(
tgl
):
return
tgl
.
strftime
(
'
%
d-
%
m-
%
Y'
)
def
hms
(
tgl
):
return
tgl
.
strftime
(
'
%
H:
%
M:
%
S'
)
def
dmy_to_date
(
tgl
):
return
datetime
.
datetime
.
strptime
(
tgl
,
'
%
d-
%
m-
%
Y'
)
def
dmyhms
(
tgl
):
return
tgl
.
strftime
(
'
%
d-
%
m-
%
Y
%
H:
%
M:
%
S'
)
def
ymd
(
tgl
):
return
tgl
.
strftime
(
'
%
Y-
%
m-
%
d'
)
def
ymdhms
(
tgl
):
return
tgl
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
)
def
dMy
(
tgl
):
return
str
(
tgl
.
day
)
+
' '
+
NAMA_BULAN
[
tgl
.
month
][
1
]
+
' '
+
str
(
tgl
.
year
)
def
tampil_bulan
(
bulan
):
return
NAMA_BULAN
[
bulan
][
1
]
def
next_month
(
year
,
month
):
if
month
==
12
:
month
=
1
year
+=
1
else
:
month
+=
1
return
year
,
month
def
best_date
(
year
,
month
,
day
):
try
:
return
datetime
.
date
(
year
,
month
,
day
)
except
ValueError
:
last_day
=
calendar
.
monthrange
(
year
,
month
)[
1
]
return
datetime
.
date
(
year
,
month
,
last_day
)
def
next_month_day
(
year
,
month
,
day
):
year
,
month
=
next_month
(
year
,
month
)
return
best_date
(
year
,
month
,
day
)
def
count_day
(
date
):
date
=
str
(
date
)
# 2018-01-02 list
year
=
int
(
date
[:
4
])
if
date
[
5
]
==
'0'
:
month
=
int
(
date
[
6
])
else
:
month
=
int
(
date
[
5
:
7
])
count_day
=
calendar
.
monthrange
(
year
,
month
)
return
int
(
count_day
[
1
])
NAMA_BULAN
=
(
(
0
,
"--Bulan--"
),
(
1
,
"Januari"
,
"Jan"
),
(
2
,
"Februari"
,
"Feb"
),
(
3
,
"Maret"
,
"Mar"
),
(
4
,
"April"
,
"Apr"
),
(
5
,
"Mei"
,
"Mei"
),
(
6
,
"Juni"
,
"Jun"
),
(
7
,
"Juli"
,
"Jul"
),
(
8
,
"Agustus"
,
"Agu"
),
(
9
,
"September"
,
"Sep"
),
(
10
,
"Oktober"
,
"Okt"
),
(
11
,
"Nopember"
,
"Nov"
),
(
12
,
"Desember"
,
"Des"
),
)
##########
# String #
##########
def
one_space
(
s
):
s
=
s
.
strip
()
while
s
.
find
(
' '
)
>
-
1
:
s
=
s
.
replace
(
' '
,
' '
)
return
s
def
to_str
(
v
):
typ
=
type
(
v
)
if
typ
==
DateType
:
return
dmy
(
v
)
if
typ
==
DateTimeType
:
return
dmyhms
(
v
)
if
v
==
0
:
return
'0'
if
typ
==
str
:
return
v
.
strip
()
elif
typ
==
bool
:
return
v
and
'1'
or
'0'
return
v
and
str
(
v
)
or
''
def
dict_to_str
(
d
):
r
=
{}
for
key
in
d
:
val
=
d
[
key
]
r
[
key
]
=
to_str
(
val
)
return
r
def
split
(
s
,
c
=
4
):
r
=
[]
while
s
:
t
=
s
[:
c
]
r
.
append
(
t
)
s
=
s
[
c
:]
return
' '
.
join
(
r
)
def
url_join
(
*
args
):
url
=
''
for
arg
in
args
:
if
arg
:
aurl
=
arg
.
strip
(
'/'
)
url
=
url
+
aurl
+
'/'
url
=
url
.
rstrip
(
'/'
)
return
url
def
upper_dict
(
d
):
new_dict
=
dict
((
str
(
k
)
.
upper
(),
v
)
for
k
,
v
in
d
.
items
())
return
new_dict
########
# File #
########
# http://stackoverflow.com/questions/2257441/random-string-generation-with-upper-case-letters-and-digits-in-python
def
get_random_string
(
width
=
6
):
return
''
.
join
(
choice
(
ascii_uppercase
+
ascii_lowercase
+
digits
)
\
for
_
in
range
(
width
))
def
get_random_number
(
width
=
12
):
return
''
.
join
(
choice
(
digits
)
\
for
_
in
range
(
width
))
def
get_ext
(
filename
):
return
os
.
path
.
splitext
(
filename
)[
-
1
]
def
get_filename
(
filename
):
return
""
.
join
(
os
.
path
.
splitext
(
filename
)[:
-
1
])
def
file_type
(
filename
):
ctype
,
encoding
=
mimetypes
.
guess_type
(
filename
)
if
ctype
is
None
or
encoding
is
not
None
:
ctype
=
'application/octet-stream'
return
ctype
class
SaveFile
(
object
):
def
__init__
(
self
,
dir_path
):
self
.
dir_path
=
dir_path
if
not
os
.
path
.
exists
(
dir_path
):
os
.
makedirs
(
dir_path
)
# Unchanged file extension, and make file prefix unique with sequential
# number.
def
create_fullpath
(
self
,
ext
=
''
):
while
True
:
filename
=
get_random_string
(
32
)
+
ext
full_path
=
os
.
path
.
join
(
self
.
dir_path
,
filename
)
if
not
os
.
path
.
exists
(
full_path
):
return
full_path
def
save
(
self
,
content
,
ext
=
''
,
filename
=
None
):
if
filename
:
fullpath
=
os
.
path
.
join
(
self
.
dir_path
,
filename
)
else
:
fullpath
=
self
.
create_fullpath
(
ext
=
ext
)
f
=
open
(
fullpath
,
'wb'
)
if
type
(
content
)
==
io
.
BytesIO
:
f
.
write
(
content
.
getbuffer
())
else
:
f
.
write
(
content
)
f
.
close
()
return
fullpath
class
InvalidExtension
(
Exception
):
def
__init__
(
self
,
exts
):
self
.
error
=
f
"Supported extensions {exts}"
class
MemoryTmpStore
(
dict
):
""" Instances of this class implement the
:class:`defWorm.interfaces.FileUploadTempStore` interface"""
def
preview_url
(
self
,
uid
):
return
None
mem_tmp_store
=
MemoryTmpStore
()
img_exts
=
[
'.png'
,
'.jpg'
,
'.pdf'
,
'.jpeg'
]
def
image_validator
(
node
,
value
):
ext
=
get_ext
(
value
[
"filename"
])
if
ext
not
in
img_exts
:
raise
colander
.
Invalid
(
node
,
f
'Extension harus salahsatu dari {img_exts}'
)
def
file_response
(
request
,
f
,
filename
,
type
):
response
=
request
.
response
response
.
content_type
=
str
(
type
)
response
.
content_disposition
=
'filename='
+
filename
response
.
write
(
f
.
read
())
return
response
class
Upload
(
SaveFile
):
def
save_to_file
(
self
,
input_file
,
ext
,
filename
=
None
):
if
filename
:
fullpath
=
os
.
path
.
join
(
self
.
dir_path
,
filename
,
ext
)
else
:
fullpath
=
self
.
create_fullpath
(
ext
)
output_file
=
open
(
fullpath
,
'wb'
)
input_file
.
seek
(
0
)
while
True
:
data
=
input_file
.
read
(
2
<<
16
)
if
not
data
:
break
output_file
.
write
(
data
)
output_file
.
close
()
return
fullpath
def
save
(
self
,
request
,
name
,
exts
=
None
,
filename
=
None
):
input_file
=
request
.
POST
[
name
]
.
file
ext
=
get_ext
(
request
.
POST
[
name
]
.
filename
)
if
exts
and
ext
not
in
exts
:
raise
InvalidExtension
(
exts
)
filename
=
self
.
save_to_file
(
input_file
,
ext
,
filename
=
filename
)
head
,
filename
=
os
.
path
.
split
(
filename
)
return
filename
def
save_fp
(
self
,
upload
):
if
'fp'
not
in
upload
or
upload
[
'fp'
]
==
b
''
or
not
upload
[
'fp'
]
or
not
upload
[
'size'
]:
if
"filename"
in
upload
:
return
upload
[
'filename'
]
return
filename
=
upload
[
'filename'
]
input_file
=
upload
[
'fp'
]
ext
=
get_ext
(
filename
)
filename
=
self
.
save_to_file
(
input_file
,
ext
)
return
os
.
path
.
split
(
filename
)[
1
]
def
saves
(
self
,
uploads
):
d
=
{}
for
upload
in
uploads
:
if
'fp'
not
in
upload
or
upload
[
'fp'
]
==
b
''
:
continue
filename
=
upload
[
'filename'
]
# input_file = upload['fp']
# ext = get_ext(filename)
# d[filename] = self.save_to_file(input_file, ext)
d
[
filename
]
=
self
.
save_fp
(
upload
)
return
d
def
download
(
self
,
request
,
filename
):
out_filename
=
os
.
path
.
join
(
self
.
dir_path
,
filename
)
if
not
os
.
path
.
exists
(
out_filename
):
raise
HTTPNotFound
ext
=
get_ext
(
filename
)[
1
:]
if
ext
in
[
'gif'
,
'png'
,
'jpeg'
,
'jpg'
]:
ext
=
'image/'
+
ext
with
open
(
out_filename
,
'rb'
)
as
f
:
return
file_response
(
request
,
f
,
filename
,
ext
)
class
UploadBin
(
Upload
):
"""
Compatibility to previous
"""
pass
class
CSVRenderer
(
object
):
def
__init__
(
self
,
info
):
pass
def
__call__
(
self
,
value
,
system
):
""" Returns a plain CSV-encoded string with content-type
``text/csv``. The content-type may be overridden by
setting ``request.response.content_type``."""
request
=
system
.
get
(
'request'
)
if
request
is
not
None
:
response
=
request
.
response
ct
=
response
.
content_type
if
ct
==
response
.
default_content_type
:
response
.
content_type
=
'text/csv'
# error ketika open csv menggunakan bytes
# karena tak mau hapus, maka dibikin try
try
:
fout
=
io
.
BytesIO
()
#
fcsv
=
csv
.
writer
(
fout
,
delimiter
=
','
,
quotechar
=
'"'
,
quoting
=
csv
.
QUOTE_MINIMAL
)
fcsv
.
writerow
(
value
.
get
(
'header'
,
[]))
fcsv
.
writerows
(
value
.
get
(
'rows'
,
[]))
except
:
fout
=
io
.
StringIO
()
fcsv
=
csv
.
writer
(
fout
,
delimiter
=
','
,
quotechar
=
'"'
,
quoting
=
csv
.
QUOTE_MINIMAL
)
fcsv
.
writerow
(
value
.
get
(
'header'
,
[]))
fcsv
.
writerows
(
value
.
get
(
'rows'
,
[]))
return
fout
.
getvalue
()
# # Data Table Function
# def _DTstrftime(chain):
# ret = chain and datetime.strftime(chain, '%d-%m-%Y')
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTnumber_format(chain):
# import locale
# locale.setlocale(locale.LC_ALL, 'id_ID.utf8')
# ret = locale.format("%d", chain, grouping=True)
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTupper(chain):
# ret = chain.upper()
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTstatus(chain):
# if chain:
# ret = 'Aktif'
# else:
# ret = 'Mati'
# if ret:
# return ret
# else:
# return chain
##########
# String #
##########
def
clean
(
s
):
r
=
''
for
ch
in
s
:
asc
=
ord
(
ch
)
if
asc
>
126
or
asc
<
32
:
ch
=
' '
r
+=
ch
return
r
# def to_str(s):
# s = s or ''
# s = str(s)
# return clean(s)
def
left
(
s
,
width
):
s
=
to_str
(
s
)
return
s
.
ljust
(
width
)[:
width
]
def
right
(
s
,
width
):
s
=
to_str
(
s
)
return
s
.
zfill
(
width
)[:
width
]
##################
# Data Structure #
##################
class
FixLength
(
object
):
def
__init__
(
self
,
struct
):
self
.
struct
=
self
.
fields
=
None
self
.
set_struct
(
struct
)
def
set_struct
(
self
,
struct
):
self
.
struct
=
struct
self
.
fields
=
{}
for
s
in
struct
:
name
=
s
[
0
]
size
=
s
[
1
:]
and
s
[
1
]
or
1
typ
=
s
[
2
:]
and
s
[
2
]
or
'A'
# N: numeric, A: alphanumeric
self
.
fields
[
name
]
=
{
'value'
:
None
,
'type'
:
typ
,
'size'
:
size
}
def
set
(
self
,
name
,
value
):
self
.
fields
[
name
][
'value'
]
=
value
def
get
(
self
,
name
):
v
=
self
.
fields
[
name
][
'value'
]
if
v
:
return
v
.
strip
()
return
''
def
__setitem__
(
self
,
name
,
value
):
self
.
set
(
name
,
value
)
def
__getitem__
(
self
,
name
):
return
self
.
get
(
name
)
def
get_raw
(
self
):
return
self
.
get_spliter
(
""
)
def
get_raw_dotted
(
self
):
return
self
.
get_dotted
()
def
get_value
(
self
,
name
,
typ
):
v
=
self
.
fields
[
name
][
'value'
]
if
v
and
typ
==
'N'
:
i
=
int
(
v
)
if
v
==
i
:
v
=
i
return
v
def
get_spliter
(
self
,
ch
):
"""
2022-09-12 Perbaikan return jika ch is none
"""
s
=
''
for
name
,
size
,
typ
in
self
.
struct
:
v
=
self
.
get_value
(
name
,
typ
)
pad_func
=
typ
==
'N'
and
right
or
left
s
+=
pad_func
(
v
,
size
)
s
+=
ch
if
ch
:
return
s
[:
-
1
]
return
s
def
get_dotted
(
self
):
return
self
.
get_spliter
(
'.'
)
def
set_raw
(
self
,
raw
):
awal
=
0
for
t
in
self
.
struct
:
name
=
t
[
0
]
size
=
t
[
1
:]
and
t
[
1
]
or
1
akhir
=
awal
+
size
value
=
raw
[
awal
:
akhir
]
if
not
value
:
return
self
.
set
(
name
,
value
)
awal
+=
size
return
True
def
from_dict
(
self
,
d
):
for
name
in
self
.
fields
:
if
name
in
d
and
d
[
name
]:
value
=
d
[
name
]
self
.
set
(
name
,
value
)
def
to_dict
(
self
):
r
=
{}
for
name
in
self
.
fields
:
r
[
name
]
=
self
.
get
(
name
)
return
r
def
length
(
self
):
return
len
(
self
.
get_raw
())
import
decimal
class
Encoder
(
JSONEncoder
):
"""Extends JSONEncoder for date and decimal types."""
def
push_date
(
self
,
d
):
"""Serialize the given datetime.date object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
return
"
%04
d-
%02
d-
%02
d"
%
(
d
.
year
,
d
.
month
,
d
.
day
)
def
push_timedelta
(
self
,
t
):
"""Serialize the given datetime.timedelta object to a JSON string."""
days
=
t
.
days
if
days
<
0
:
minus
=
"-"
days
=
-
days
-
1
seconds
=
24
*
60
*
60
-
t
.
seconds
else
:
minus
=
""
seconds
=
t
.
seconds
secs
=
seconds
%
60
seconds
/=
60
mins
=
seconds
%
60
hours
=
seconds
/
60
return
"
%
s
%
d:
%02
d:
%02
d:
%02
d"
%
(
minus
,
days
,
hours
,
mins
,
secs
)
def
push_time
(
self
,
t
):
"""Serialize the given datetime.time object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
return
"
%02
d:
%02
d:
%02
d"
%
(
t
.
hour
,
t
.
minute
,
t
.
second
)
def
push_datetime
(
self
,
dt
):
"""Serialize the given datetime.datetime object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
# Don't use strftime because that can't handle dates before 1900.
return
(
"
%04
d-
%02
d-
%02
dT
%02
d:
%02
d:
%02
d"
%
(
dt
.
year
,
dt
.
month
,
dt
.
day
,
dt
.
hour
,
dt
.
minute
,
dt
.
second
))
def
default
(
self
,
o
):
# We MUST check for a datetime.datetime instance before datetime.date.
# datetime.datetime is a subclass of datetime.date, and therefore
# instances of it are also instances of datetime.date.
if
isinstance
(
o
,
datetime
.
datetime
):
return
self
.
push_datetime
(
o
)
elif
isinstance
(
o
,
datetime
.
date
):
return
self
.
push_date
(
o
)
elif
isinstance
(
o
,
datetime
.
timedelta
):
return
self
.
push_timedelta
(
o
)
elif
isinstance
(
o
,
datetime
.
time
):
return
self
.
push_time
(
o
)
elif
isinstance
(
o
,
decimal
.
Decimal
):
return
str
(
o
)
else
:
return
JSONEncoder
.
default
(
self
,
o
)
STATUS
=
(
(
1
,
'Aktif'
),
(
0
,
'Pasif'
)
)
INVOICE_STATUS
=
(
(
0
,
'Draft'
),
(
1
,
'Approved'
),
)
def
pbb_biller_split
():
settings
=
get_settings
()
if
not
'pbb_biller'
in
settings
:
return
None
,
None
biller
=
re
.
sub
(
'
\
D'
,
''
,
settings
[
'pbb_biller'
])
return
biller
[:
2
],
biller
[
2
:]
def
clear_null_value
(
values
):
# digunakan untuk menghapus dictionary yang value nya null
tobe_del
=
[]
for
value
in
values
:
if
not
values
[
value
]:
tobe_del
.
append
(
value
)
for
value
in
tobe_del
:
del
values
[
value
]
return
values
def
replace_char
(
text
,
start
,
stop
,
char
=
"X"
):
x
=
char
*
(
stop
-
start
+
1
)
s
=
text
[:
start
]
+
x
+
text
[
stop
:]
# print "*** DEBUG ", text, s
return
s
##################
# Email #
##################
# import smtplib
# from email.MIMEMultipart import MIMEMultipart
# from email.MIMEText import MIMEText
# from email.MIMEBase import MIMEBase
# import base64
SCOPES
=
[
'https://www.googleapis.com/auth/gmail.readonly'
,
'https://www.googleapis.com/auth/userinfo.email'
,
'https://www.googleapis.com/auth/userinfo.profile'
,
# Add other requested scopes.
]
# def send_email(to_addr_list, subject, message, cc_addr_list=None,from_addr=None,
# login=None, password=None, file=None,
# smtp_server='smtp.gmail.com:587'):
# settings = get_settings()
# if settings['devel']=='true':
# return
#
# if not login:
#
# login = 'center.email' in settings and settings['center.email'] or None
# # if login[-9:]=="gmail.com":
# # return send_gmsg(to=to_addr_list, subject=subject, message_text=message, file=file,)
#
# if not password:
# password = 'center.email_password' in settings and settings['center.email_password'] or None
# if not login or not password:
# return 'Error smtp login or Password'
#
# if 'center.smtp_server' in settings and settings['center.smtp_server']:
# smtp_server = settings['center.smtp_server']
#
# if not from_addr:
# from_addr = login
#
# header = 'From: %s\r\n' % from_addr
# header += 'To: %s \r\n' % ','.join(to_addr_list)
# if cc_addr_list:
# header += 'Cc: %s \r\n' % ','.join(cc_addr_list)
# header += 'Subject: %s \r\n' % subject
# message = header + message
# smtp, port = smtp_server.split(':')
# port = int(port)
#
# #print '>>>>>>>>>>>>', smtp, port, login,password
# try:
# #print smtp_server, smtp, port
# server = smtplib.SMTP(smtp,port) #smtp_server)
# server.ehlo()
# if port==587:
# server.starttls()
#
# server.login(login,password)
# problems = server.sendmail(from_addr, to_addr_list, message)
# server.quit()
# return
# except Exception as e:
# return str(e)
#
# def create_message(sender, to, subject, message_text, file=None):
# """Create a message for an email.
#
# Args:
# sender: Email address of the sender.
# to: Email address of the receiver.
# subject: The subject of the email message.
# message_text: The text of the email message.
# file: The path to the file to be attached.
#
# Returns:
# An object containing a base64url encoded email object.
# """
# if file:
# message = MIMEMultipart()
# msg = MIMEText(message_text)
# else:
# message = MIMEText(message_text)
#
# message['to'] = to
# message['from'] = sender
# message['subject'] = subject
#
# if file:
# message.attach(msg)
# content_type, encoding = mimetypes.guess_type(file)
#
# if content_type is None or encoding is not None:
# content_type = 'application/octet-stream'
# main_type, sub_type = content_type.split('/', 1)
# if main_type == 'text':
# fp = open(file, 'rb')
# msg = MIMEText(fp.read(), _subtype=sub_type)
# fp.close()
# elif main_type == 'image':
# fp = open(file, 'rb')
# msg = MIMEImage(fp.read(), _subtype=sub_type)
# fp.close()
# elif main_type == 'audio':
# fp = open(file, 'rb')
# msg = MIMEAudio(fp.read(), _subtype=sub_type)
# fp.close()
# else:
# fp = open(file, 'rb')
# msg = MIMEBase(main_type, sub_type)
# msg.set_payload(fp.read())
# fp.close()
# filename = os.path.basename(file)
# msg.add_header('Content-Disposition', 'attachment', filename=filename)
# message.attach(msg)
#
# return {'raw': base64.urlsafe_b64encode(message.as_string())}
#
#
#
# def get_service():
# from googleapiclient.discovery import build
# from httplib2 import Http
# from oauth2client import file, client, tools
# """Shows basic usage of the Gmail API.
# Lists the user's Gmail labels.
# """
# # The file token.json stores the user's access and refresh tokens, and is
# # created automatically when the authorization flow completes for the first
# # time.
# store = file.Storage('token.json')
# creds = store.get()
# if not creds or creds.invalid:
# flow = client.flow_from_clientsecrets('./credentials.json', SCOPES)
# creds = tools.run_flow(flow, store)
# return build('gmail', 'v1', http=creds.authorize(Http()))
#
# def send_gmsg(to, subject, message_text, file=None):
# settings = get_settings()
# sender = 'center.email' in settings and settings['center.email'] or None
#
# message = create_message(sender, to, subject, message_text, file=None)
# service = get_service()
# try:
# message = (service.users().messages().send(userId=user_id, body=message)
# .execute())
# return message
# except errors.HttpError as error:
# pass
#
# http://python.web.id/angka-terbilang-pada-python/#.VQpS8s2sXQo
satuan
=
[
''
,
'satu'
,
'dua'
,
'tiga'
,
'empat'
,
'lima'
,
'enam'
,
'tujuh'
,
'delapan'
,
'sembilan'
,
'sepuluh'
,
'sebelas'
]
def
terbilang_
(
n
):
n
=
int
(
n
)
if
n
>=
1000000000
:
hasil
=
terbilang_
(
n
/
1000000000
)
+
[
'milyar'
]
+
terbilang_
(
n
%
100000000
)
elif
n
>
1000000
:
hasil
=
terbilang_
(
n
/
1000000
)
+
[
'juta'
]
+
terbilang_
(
n
%
1000000
)
elif
n
>=
2000
:
hasil
=
terbilang_
(
n
/
1000
)
+
[
'ribu'
]
+
terbilang_
(
n
%
1000
)
elif
n
>=
1000
:
hasil
=
[
'seribu'
]
+
terbilang_
(
n
-
1000
)
elif
n
>=
200
:
hasil
=
terbilang_
(
n
/
100
)
+
[
'ratus'
]
+
terbilang_
(
n
%
100
)
elif
n
>=
100
:
hasil
=
[
'seratus'
]
+
terbilang_
(
n
-
100
)
elif
n
>=
20
:
hasil
=
terbilang_
(
n
/
10
)
+
[
'puluh'
]
+
terbilang_
(
n
%
10
)
elif
n
>=
12
:
hasil
=
terbilang_
(
n
%
10
)
+
[
'belas'
]
else
:
hasil
=
[
satuan
[
n
]]
print
(
'hasil'
)
return
hasil
def
terbilang
(
n
):
if
n
==
0
:
return
'nol'
t
=
terbilang_
(
n
)
while
''
in
t
:
t
.
remove
(
''
)
return
' '
.
join
(
t
)
def
number_only
(
value
):
return
re
.
sub
(
'
\
D'
,
""
,
value
)
def
set_user_log
(
message
,
request
,
logobj
=
None
,
user_name
=
None
):
if
not
logobj
:
logobj
=
log
if
not
request
:
logobj
.
info
(
"Request not Set"
)
return
if
not
user_name
:
user_name
=
request
and
request
.
user
and
request
.
user
.
user_name
or
None
if
not
user_name
:
logobj
.
info
(
"User Name not Set"
)
return
addr
=
request
.
client_addr
message
=
"User {} at Addr {}: {}"
.
format
(
user_name
,
addr
,
message
)
logobj
.
warning
(
message
)
def
create_static_path
(
config
,
folder
,
url
,
cache_max_age
=
3600
):
log
.
debug
(
"Folder to create:
%
s"
,
folder
)
if
not
os
.
path
.
exists
(
folder
):
try
:
os
.
makedirs
(
folder
)
except
Exception
as
e
:
log
.
debug
(
"User:
%
s"
,
os
.
path
.
expanduser
(
"~"
))
log
.
debug
(
"Error:
%
s"
,
str
(
e
))
config
.
add_static_view
(
url
,
path
=
folder
,
cache_max_age
=
cache_max_age
)
def
includeme
(
config
):
config
.
add_translation_dirs
(
'opensipkd.tools:locale/'
)
from
__future__
import
print_function
import
os
import
re
import
mimetypes
import
csv
import
calendar
import
datetime
from
random
import
choice
from
string
import
(
ascii_uppercase
,
ascii_lowercase
,
digits
,
)
import
locale
import
colander
import
pytz
import
io
from
pyramid.httpexceptions
import
HTTPNotFound
from
pyramid.threadlocal
import
get_current_registry
from
json
import
JSONEncoder
import
logging
log
=
logging
.
getLogger
(
__name__
)
################
# Phone number #
################
MSISDN_ALLOW_CHARS
=
map
(
lambda
x
:
str
(
x
),
range
(
10
))
# + ['+']
def
get_msisdn
(
msisdn
,
country
=
'+62'
):
"""
Digunakan untuk pengecekan no telp suatu negara
:param msisdn:
:param country:
:return: output kode negara+no msisdn
"""
# for ch in msisdn:
# if ch not in MSISDN_ALLOW_CHARS:
# raise Exception(f'{ch} not in {print(x for x in range(10))}')
try
:
i
=
int
(
msisdn
)
except
ValueError
as
e
:
log
.
info
(
e
)
return
if
not
i
:
raise
Exception
(
'MSISDN must filled'
)
if
len
(
str
(
i
))
<
7
:
raise
Exception
(
'MSISDN less then 7 number'
)
if
re
.
compile
(
r'^\+'
)
.
search
(
msisdn
):
return
msisdn
if
re
.
compile
(
r'^0'
)
.
search
(
msisdn
):
return
'
%
s
%
s'
%
(
country
,
msisdn
.
lstrip
(
'0'
))
################
# Money format #
################
def
should_int
(
value
):
int_
=
int
(
value
)
if
int_
==
value
:
return
int_
return
value
def
thousand
(
value
,
float_count
=
None
):
"""
Memisahkan ribuan
:param value:
:param float_count:
:return: string
"""
if
not
value
:
return
"0"
if
float_count
is
None
:
# autodetection
if
type
(
value
)
in
(
int
,
float
):
float_count
=
0
else
:
float_count
=
2
return
locale
.
format_string
(
'
%%
.
%
df'
%
float_count
,
value
,
True
)
def
money
(
value
,
float_count
=
None
,
currency
=
None
):
"""
Memisahkan ribuan dan menambahkan nama mata uang
:param value:
:param float_count:
:param currency:
:return:
"""
if
value
<
0
:
v
=
abs
(
value
)
format_
=
'(
%
s)'
else
:
v
=
value
format_
=
'
%
s'
if
currency
is
None
:
currency
=
locale
.
localeconv
()[
'currency_symbol'
]
s
=
' '
.
join
([
currency
,
thousand
(
v
,
float_count
)])
return
format_
%
s
def
round_up
(
n
):
i
=
int
(
n
)
if
n
==
i
:
return
i
if
n
>
i
:
return
i
+
1
return
i
-
1
###########
# Pyramid #
###########
def
get_settings
():
return
get_current_registry
()
.
settings
def
get_params
(
params
,
alternate
=
None
):
"""
Digunakan untuk mengambil nilai dari konfigurasi sesuai params yang disebut
:param params: variable
:param alternate: default apabila tidak ditemukan data/params
:return: value
contoh penggunaan:
get_params('devel', False)
"""
settings
=
get_settings
()
result
=
settings
and
params
in
settings
and
settings
[
params
]
.
strip
()
or
None
return
result
and
result
or
alternate
def
devel
():
settings
=
get_settings
()
is_devel
=
'devel'
in
settings
and
settings
[
'devel'
]
and
True
or
False
print
(
"DEBUG>>"
,
is_devel
)
return
is_devel
def
get_timezone
():
settings
=
get_settings
()
return
pytz
.
timezone
(
settings
[
'timezone'
])
def
get_modules
():
settings
=
get_settings
()
return
settings
[
'modules'
]
.
split
(
','
)
########
# Time #
########
one_second
=
datetime
.
timedelta
(
1.0
/
24
/
60
/
60
)
DateType
=
type
(
datetime
.
date
.
today
())
DateTimeType
=
type
(
datetime
.
datetime
.
now
())
TimeZoneFile
=
'/etc/timezone'
if
os
.
path
.
exists
(
TimeZoneFile
):
DefaultTimeZone
=
open
(
TimeZoneFile
)
.
read
()
.
strip
()
else
:
DefaultTimeZone
=
'Asia/Jakarta'
def
as_timezone
(
tz_date
):
localtz
=
get_timezone
()
if
not
tz_date
.
tzinfo
:
tz_date
=
create_datetime
(
tz_date
.
year
,
tz_date
.
month
,
tz_date
.
day
,
tz_date
.
hour
,
tz_date
.
minute
,
tz_date
.
second
,
tz_date
.
microsecond
)
return
tz_date
.
astimezone
(
localtz
)
def
create_datetime
(
year
,
month
,
day
,
hour
=
0
,
minute
=
7
,
second
=
0
,
microsecond
=
0
):
tz
=
get_timezone
()
dt
=
datetime
.
datetime
(
year
,
month
,
day
,
hour
,
minute
,
second
,
microsecond
)
return
tz
.
localize
(
dt
)
def
create_date
(
year
,
month
,
day
):
return
create_datetime
(
year
,
month
,
day
)
def
create_now
():
tz
=
get_timezone
()
return
datetime
.
datetime
.
now
(
tz
)
def
date_from_str
(
value
):
separator
=
None
value
=
value
.
split
()[
0
]
# dd-mm-yyyy HH:MM:SS
for
s
in
[
'-'
,
'/'
,
'.'
]:
if
value
.
find
(
s
)
>
-
1
:
separator
=
s
break
if
separator
:
t
=
list
(
map
(
lambda
x
:
int
(
x
),
value
.
split
(
separator
)))
y
,
m
,
d
=
t
[
2
],
t
[
1
],
t
[
0
]
if
d
>
999
:
# yyyy-mm-dd
y
,
d
=
d
,
y
else
:
y
,
m
,
d
=
int
(
value
[:
4
]),
int
(
value
[
4
:
6
]),
int
(
value
[
6
:])
return
datetime
.
date
(
y
,
m
,
d
)
def
time_from_str
(
value
):
# separator = ":"
value
=
value
.
split
()[
1
]
# dd-mm-yyyy HH:MM:SS
# return value.strptime('%H:%M:%S')
h
,
m
,
s
=
value
.
split
(
":"
)
# return datetime.time(h, m, s)
return
datetime
.
timedelta
(
hours
=
int
(
h
),
minutes
=
int
(
m
),
seconds
=
int
(
s
))
def
datetime_from_str
(
value
):
# separator = None
dt
=
date_from_str
(
value
)
tm
=
time_from_str
(
value
)
return
datetime
.
datetime
(
dt
.
year
,
dt
.
month
,
dt
.
day
)
+
tm
def
dmy
(
tgl
):
return
tgl
.
strftime
(
'
%
d-
%
m-
%
Y'
)
def
hms
(
tgl
):
return
tgl
.
strftime
(
'
%
H:
%
M:
%
S'
)
def
dmy_to_date
(
tgl
):
return
datetime
.
datetime
.
strptime
(
tgl
,
'
%
d-
%
m-
%
Y'
)
def
dmyhms
(
tgl
):
return
tgl
.
strftime
(
'
%
d-
%
m-
%
Y
%
H:
%
M:
%
S'
)
def
ymd
(
tgl
):
return
tgl
.
strftime
(
'
%
Y-
%
m-
%
d'
)
def
ymdhms
(
tgl
):
return
tgl
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
)
def
dMy
(
tgl
):
return
str
(
tgl
.
day
)
+
' '
+
NAMA_BULAN
[
tgl
.
month
][
1
]
+
' '
+
str
(
tgl
.
year
)
def
tampil_bulan
(
bulan
):
return
NAMA_BULAN
[
bulan
][
1
]
def
next_month
(
year
,
month
):
if
month
==
12
:
month
=
1
year
+=
1
else
:
month
+=
1
return
year
,
month
def
best_date
(
year
,
month
,
day
):
try
:
return
datetime
.
date
(
year
,
month
,
day
)
except
ValueError
:
last_day
=
calendar
.
monthrange
(
year
,
month
)[
1
]
return
datetime
.
date
(
year
,
month
,
last_day
)
def
next_month_day
(
year
,
month
,
day
):
year
,
month
=
next_month
(
year
,
month
)
return
best_date
(
year
,
month
,
day
)
def
count_day
(
date
):
date
=
str
(
date
)
# 2018-01-02 list
year
=
int
(
date
[:
4
])
if
date
[
5
]
==
'0'
:
month
=
int
(
date
[
6
])
else
:
month
=
int
(
date
[
5
:
7
])
count_day
=
calendar
.
monthrange
(
year
,
month
)
return
int
(
count_day
[
1
])
NAMA_BULAN
=
(
(
0
,
"--Bulan--"
),
(
1
,
"Januari"
,
"Jan"
),
(
2
,
"Februari"
,
"Feb"
),
(
3
,
"Maret"
,
"Mar"
),
(
4
,
"April"
,
"Apr"
),
(
5
,
"Mei"
,
"Mei"
),
(
6
,
"Juni"
,
"Jun"
),
(
7
,
"Juli"
,
"Jul"
),
(
8
,
"Agustus"
,
"Agu"
),
(
9
,
"September"
,
"Sep"
),
(
10
,
"Oktober"
,
"Okt"
),
(
11
,
"Nopember"
,
"Nov"
),
(
12
,
"Desember"
,
"Des"
),
)
##########
# String #
##########
def
one_space
(
s
):
s
=
s
.
strip
()
while
s
.
find
(
' '
)
>
-
1
:
s
=
s
.
replace
(
' '
,
' '
)
return
s
def
to_str
(
v
):
typ
=
type
(
v
)
if
typ
==
DateType
:
return
dmy
(
v
)
if
typ
==
DateTimeType
:
return
dmyhms
(
v
)
if
v
==
0
:
return
'0'
if
typ
==
str
:
return
v
.
strip
()
elif
typ
==
bool
:
return
v
and
'1'
or
'0'
return
v
and
str
(
v
)
or
''
def
dict_to_str
(
d
):
r
=
{}
for
key
in
d
:
val
=
d
[
key
]
r
[
key
]
=
to_str
(
val
)
return
r
def
split
(
s
,
c
=
4
):
r
=
[]
while
s
:
t
=
s
[:
c
]
r
.
append
(
t
)
s
=
s
[
c
:]
return
' '
.
join
(
r
)
def
url_join
(
*
args
):
url
=
''
for
arg
in
args
:
if
arg
:
aurl
=
arg
.
strip
(
'/'
)
url
=
url
+
aurl
+
'/'
url
=
url
.
rstrip
(
'/'
)
return
url
def
upper_dict
(
d
):
new_dict
=
dict
((
str
(
k
)
.
upper
(),
v
)
for
k
,
v
in
d
.
items
())
return
new_dict
########
# File #
########
# http://stackoverflow.com/questions/2257441/random-string-generation-with-upper-case-letters-and-digits-in-python
def
get_random_string
(
width
=
6
):
return
''
.
join
(
choice
(
ascii_uppercase
+
ascii_lowercase
+
digits
)
\
for
_
in
range
(
width
))
def
get_random_number
(
width
=
12
):
return
''
.
join
(
choice
(
digits
)
\
for
_
in
range
(
width
))
def
get_ext
(
filename
):
return
os
.
path
.
splitext
(
filename
)[
-
1
]
def
get_filename
(
filename
):
return
""
.
join
(
os
.
path
.
splitext
(
filename
)[:
-
1
])
def
file_type
(
filename
):
ctype
,
encoding
=
mimetypes
.
guess_type
(
filename
)
if
ctype
is
None
or
encoding
is
not
None
:
ctype
=
'application/octet-stream'
return
ctype
class
SaveFile
(
object
):
def
__init__
(
self
,
dir_path
):
self
.
dir_path
=
dir_path
if
not
os
.
path
.
exists
(
dir_path
):
os
.
makedirs
(
dir_path
)
# Unchanged file extension, and make file prefix unique with sequential
# number.
def
create_fullpath
(
self
,
ext
=
''
):
while
True
:
filename
=
get_random_string
(
32
)
+
ext
full_path
=
os
.
path
.
join
(
self
.
dir_path
,
filename
)
if
not
os
.
path
.
exists
(
full_path
):
return
full_path
def
save
(
self
,
content
,
ext
=
''
,
filename
=
None
):
if
filename
:
fullpath
=
os
.
path
.
join
(
self
.
dir_path
,
filename
)
else
:
fullpath
=
self
.
create_fullpath
(
ext
=
ext
)
f
=
open
(
fullpath
,
'wb'
)
if
type
(
content
)
==
io
.
BytesIO
:
f
.
write
(
content
.
getbuffer
())
else
:
f
.
write
(
content
)
f
.
close
()
return
fullpath
class
InvalidExtension
(
Exception
):
def
__init__
(
self
,
exts
):
self
.
error
=
f
"Supported extensions {exts}"
class
MemoryTmpStore
(
dict
):
""" Instances of this class implement the
:class:`defWorm.interfaces.FileUploadTempStore` interface"""
def
preview_url
(
self
,
uid
):
return
None
mem_tmp_store
=
MemoryTmpStore
()
img_exts
=
[
'.png'
,
'.jpg'
,
'.pdf'
,
'.jpeg'
]
def
image_validator
(
node
,
value
):
ext
=
get_ext
(
value
[
"filename"
])
if
ext
not
in
img_exts
:
raise
colander
.
Invalid
(
node
,
f
'Extension harus salahsatu dari {img_exts}'
)
def
file_response
(
request
,
f
,
filename
,
type
):
response
=
request
.
response
response
.
content_type
=
str
(
type
)
response
.
content_disposition
=
'filename='
+
filename
response
.
write
(
f
.
read
())
return
response
class
Upload
(
SaveFile
):
def
save_to_file
(
self
,
input_file
,
ext
,
filename
=
None
):
if
filename
:
fullpath
=
os
.
path
.
join
(
self
.
dir_path
,
filename
,
ext
)
else
:
fullpath
=
self
.
create_fullpath
(
ext
)
output_file
=
open
(
fullpath
,
'wb'
)
input_file
.
seek
(
0
)
while
True
:
data
=
input_file
.
read
(
2
<<
16
)
if
not
data
:
break
output_file
.
write
(
data
)
output_file
.
close
()
return
fullpath
def
save
(
self
,
request
,
name
,
exts
=
None
,
filename
=
None
):
input_file
=
request
.
POST
[
name
]
.
file
ext
=
get_ext
(
request
.
POST
[
name
]
.
filename
)
if
exts
and
ext
not
in
exts
:
raise
InvalidExtension
(
exts
)
filename
=
self
.
save_to_file
(
input_file
,
ext
,
filename
=
filename
)
head
,
filename
=
os
.
path
.
split
(
filename
)
return
filename
def
save_fp
(
self
,
upload
):
if
'fp'
not
in
upload
or
upload
[
'fp'
]
==
b
''
or
not
upload
[
'fp'
]
or
not
\
upload
[
'size'
]:
if
"filename"
in
upload
:
return
upload
[
'filename'
]
return
filename
=
upload
[
'filename'
]
input_file
=
upload
[
'fp'
]
ext
=
get_ext
(
filename
)
filename
=
self
.
save_to_file
(
input_file
,
ext
)
return
os
.
path
.
split
(
filename
)[
1
]
def
saves
(
self
,
uploads
):
d
=
{}
for
upload
in
uploads
:
if
'fp'
not
in
upload
or
upload
[
'fp'
]
==
b
''
:
continue
filename
=
upload
[
'filename'
]
# input_file = upload['fp']
# ext = get_ext(filename)
# d[filename] = self.save_to_file(input_file, ext)
d
[
filename
]
=
self
.
save_fp
(
upload
)
return
d
def
download
(
self
,
request
,
filename
):
out_filename
=
os
.
path
.
join
(
self
.
dir_path
,
filename
)
if
not
os
.
path
.
exists
(
out_filename
):
raise
HTTPNotFound
ext
=
get_ext
(
filename
)[
1
:]
if
ext
in
[
'gif'
,
'png'
,
'jpeg'
,
'jpg'
]:
ext
=
'image/'
+
ext
with
open
(
out_filename
,
'rb'
)
as
f
:
return
file_response
(
request
,
f
,
filename
,
ext
)
class
UploadBin
(
Upload
):
"""
Compatibility to previous
"""
pass
class
CSVRenderer
(
object
):
def
__init__
(
self
,
info
):
pass
def
__call__
(
self
,
value
,
system
):
""" Returns a plain CSV-encoded string with content-type
``text/csv``. The content-type may be overridden by
setting ``request.response.content_type``."""
request
=
system
.
get
(
'request'
)
if
request
is
not
None
:
response
=
request
.
response
ct
=
response
.
content_type
if
ct
==
response
.
default_content_type
:
response
.
content_type
=
'text/csv'
# error ketika open csv menggunakan bytes
# karena tak mau hapus, maka dibikin try
try
:
fout
=
io
.
BytesIO
()
#
fcsv
=
csv
.
writer
(
fout
,
delimiter
=
','
,
quotechar
=
'"'
,
quoting
=
csv
.
QUOTE_MINIMAL
)
fcsv
.
writerow
(
value
.
get
(
'header'
,
[]))
fcsv
.
writerows
(
value
.
get
(
'rows'
,
[]))
except
:
fout
=
io
.
StringIO
()
fcsv
=
csv
.
writer
(
fout
,
delimiter
=
','
,
quotechar
=
'"'
,
quoting
=
csv
.
QUOTE_MINIMAL
)
fcsv
.
writerow
(
value
.
get
(
'header'
,
[]))
fcsv
.
writerows
(
value
.
get
(
'rows'
,
[]))
return
fout
.
getvalue
()
# # Data Table Function
# def _DTstrftime(chain):
# ret = chain and datetime.strftime(chain, '%d-%m-%Y')
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTnumber_format(chain):
# import locale
# locale.setlocale(locale.LC_ALL, 'id_ID.utf8')
# ret = locale.format("%d", chain, grouping=True)
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTupper(chain):
# ret = chain.upper()
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTstatus(chain):
# if chain:
# ret = 'Aktif'
# else:
# ret = 'Mati'
# if ret:
# return ret
# else:
# return chain
##########
# String #
##########
def
clean
(
s
):
r
=
''
for
ch
in
s
:
asc
=
ord
(
ch
)
if
asc
>
126
or
asc
<
32
:
ch
=
' '
r
+=
ch
return
r
# def to_str(s):
# s = s or ''
# s = str(s)
# return clean(s)
def
left
(
s
,
width
):
s
=
to_str
(
s
)
return
s
.
ljust
(
width
)[:
width
]
def
right
(
s
,
width
):
s
=
to_str
(
s
)
return
s
.
zfill
(
width
)[:
width
]
##################
# Data Structure #
##################
class
FixLength
(
object
):
def
__init__
(
self
,
struct
):
self
.
struct
=
self
.
fields
=
None
self
.
set_struct
(
struct
)
def
set_struct
(
self
,
struct
):
self
.
struct
=
struct
self
.
fields
=
{}
for
s
in
struct
:
name
=
s
[
0
]
size
=
s
[
1
:]
and
s
[
1
]
or
1
typ
=
s
[
2
:]
and
s
[
2
]
or
'A'
# N: numeric, A: alphanumeric
self
.
fields
[
name
]
=
{
'value'
:
None
,
'type'
:
typ
,
'size'
:
size
}
def
set
(
self
,
name
,
value
):
self
.
fields
[
name
][
'value'
]
=
value
def
get
(
self
,
name
):
v
=
self
.
fields
[
name
][
'value'
]
if
v
:
return
v
.
strip
()
return
''
def
__setitem__
(
self
,
name
,
value
):
self
.
set
(
name
,
value
)
def
__getitem__
(
self
,
name
):
return
self
.
get
(
name
)
def
get_raw
(
self
):
return
self
.
get_spliter
(
""
)
def
get_raw_dotted
(
self
):
return
self
.
get_dotted
()
def
get_value
(
self
,
name
,
typ
):
v
=
self
.
fields
[
name
][
'value'
]
if
v
and
typ
==
'N'
:
i
=
int
(
v
)
if
v
==
i
:
v
=
i
return
v
def
get_spliter
(
self
,
ch
):
"""
2022-09-12 Perbaikan return jika ch is none
"""
s
=
''
for
name
,
size
,
typ
in
self
.
struct
:
v
=
self
.
get_value
(
name
,
typ
)
pad_func
=
typ
==
'N'
and
right
or
left
s
+=
pad_func
(
v
,
size
)
s
+=
ch
if
ch
:
return
s
[:
-
1
]
return
s
def
get_dotted
(
self
):
return
self
.
get_spliter
(
'.'
)
def
set_raw
(
self
,
raw
):
awal
=
0
for
t
in
self
.
struct
:
name
=
t
[
0
]
size
=
t
[
1
:]
and
t
[
1
]
or
1
akhir
=
awal
+
size
value
=
raw
[
awal
:
akhir
]
if
not
value
:
return
self
.
set
(
name
,
value
)
awal
+=
size
return
True
def
from_dict
(
self
,
d
):
for
name
in
self
.
fields
:
if
name
in
d
and
d
[
name
]:
value
=
d
[
name
]
self
.
set
(
name
,
value
)
def
to_dict
(
self
):
r
=
{}
for
name
in
self
.
fields
:
r
[
name
]
=
self
.
get
(
name
)
return
r
def
length
(
self
):
return
len
(
self
.
get_raw
())
import
decimal
class
Encoder
(
JSONEncoder
):
"""Extends JSONEncoder for date and decimal types."""
def
push_date
(
self
,
d
):
"""Serialize the given datetime.date object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
return
"
%04
d-
%02
d-
%02
d"
%
(
d
.
year
,
d
.
month
,
d
.
day
)
def
push_timedelta
(
self
,
t
):
"""Serialize the given datetime.timedelta object to a JSON string."""
days
=
t
.
days
if
days
<
0
:
minus
=
"-"
days
=
-
days
-
1
seconds
=
24
*
60
*
60
-
t
.
seconds
else
:
minus
=
""
seconds
=
t
.
seconds
secs
=
seconds
%
60
seconds
/=
60
mins
=
seconds
%
60
hours
=
seconds
/
60
return
"
%
s
%
d:
%02
d:
%02
d:
%02
d"
%
(
minus
,
days
,
hours
,
mins
,
secs
)
def
push_time
(
self
,
t
):
"""Serialize the given datetime.time object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
return
"
%02
d:
%02
d:
%02
d"
%
(
t
.
hour
,
t
.
minute
,
t
.
second
)
def
push_datetime
(
self
,
dt
):
"""Serialize the given datetime.datetime object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
# Don't use strftime because that can't handle dates before 1900.
return
(
"
%04
d-
%02
d-
%02
dT
%02
d:
%02
d:
%02
d"
%
(
dt
.
year
,
dt
.
month
,
dt
.
day
,
dt
.
hour
,
dt
.
minute
,
dt
.
second
))
def
default
(
self
,
o
):
# We MUST check for a datetime.datetime instance before datetime.date.
# datetime.datetime is a subclass of datetime.date, and therefore
# instances of it are also instances of datetime.date.
if
isinstance
(
o
,
datetime
.
datetime
):
return
self
.
push_datetime
(
o
)
elif
isinstance
(
o
,
datetime
.
date
):
return
self
.
push_date
(
o
)
elif
isinstance
(
o
,
datetime
.
timedelta
):
return
self
.
push_timedelta
(
o
)
elif
isinstance
(
o
,
datetime
.
time
):
return
self
.
push_time
(
o
)
elif
isinstance
(
o
,
decimal
.
Decimal
):
return
str
(
o
)
else
:
return
JSONEncoder
.
default
(
self
,
o
)
STATUS
=
(
(
1
,
'Aktif'
),
(
0
,
'Pasif'
)
)
INVOICE_STATUS
=
(
(
0
,
'Draft'
),
(
1
,
'Approved'
),
)
def
pbb_biller_split
():
settings
=
get_settings
()
if
not
'pbb_biller'
in
settings
:
return
None
,
None
biller
=
re
.
sub
(
'
\
D'
,
''
,
settings
[
'pbb_biller'
])
return
biller
[:
2
],
biller
[
2
:]
def
clear_null_value
(
values
):
# digunakan untuk menghapus dictionary yang value nya null
tobe_del
=
[]
for
value
in
values
:
if
not
values
[
value
]:
tobe_del
.
append
(
value
)
for
value
in
tobe_del
:
del
values
[
value
]
return
values
def
replace_char
(
text
,
start
,
stop
,
char
=
"X"
):
x
=
char
*
(
stop
-
start
+
1
)
s
=
text
[:
start
]
+
x
+
text
[
stop
:]
# print "*** DEBUG ", text, s
return
s
##################
# Email #
##################
# import smtplib
# from email.MIMEMultipart import MIMEMultipart
# from email.MIMEText import MIMEText
# from email.MIMEBase import MIMEBase
# import base64
SCOPES
=
[
'https://www.googleapis.com/auth/gmail.readonly'
,
'https://www.googleapis.com/auth/userinfo.email'
,
'https://www.googleapis.com/auth/userinfo.profile'
,
# Add other requested scopes.
]
# def send_email(to_addr_list, subject, message, cc_addr_list=None,from_addr=None,
# login=None, password=None, file=None,
# smtp_server='smtp.gmail.com:587'):
# settings = get_settings()
# if settings['devel']=='true':
# return
#
# if not login:
#
# login = 'center.email' in settings and settings['center.email'] or None
# # if login[-9:]=="gmail.com":
# # return send_gmsg(to=to_addr_list, subject=subject, message_text=message, file=file,)
#
# if not password:
# password = 'center.email_password' in settings and settings['center.email_password'] or None
# if not login or not password:
# return 'Error smtp login or Password'
#
# if 'center.smtp_server' in settings and settings['center.smtp_server']:
# smtp_server = settings['center.smtp_server']
#
# if not from_addr:
# from_addr = login
#
# header = 'From: %s\r\n' % from_addr
# header += 'To: %s \r\n' % ','.join(to_addr_list)
# if cc_addr_list:
# header += 'Cc: %s \r\n' % ','.join(cc_addr_list)
# header += 'Subject: %s \r\n' % subject
# message = header + message
# smtp, port = smtp_server.split(':')
# port = int(port)
#
# #print '>>>>>>>>>>>>', smtp, port, login,password
# try:
# #print smtp_server, smtp, port
# server = smtplib.SMTP(smtp,port) #smtp_server)
# server.ehlo()
# if port==587:
# server.starttls()
#
# server.login(login,password)
# problems = server.sendmail(from_addr, to_addr_list, message)
# server.quit()
# return
# except Exception as e:
# return str(e)
#
# def create_message(sender, to, subject, message_text, file=None):
# """Create a message for an email.
#
# Args:
# sender: Email address of the sender.
# to: Email address of the receiver.
# subject: The subject of the email message.
# message_text: The text of the email message.
# file: The path to the file to be attached.
#
# Returns:
# An object containing a base64url encoded email object.
# """
# if file:
# message = MIMEMultipart()
# msg = MIMEText(message_text)
# else:
# message = MIMEText(message_text)
#
# message['to'] = to
# message['from'] = sender
# message['subject'] = subject
#
# if file:
# message.attach(msg)
# content_type, encoding = mimetypes.guess_type(file)
#
# if content_type is None or encoding is not None:
# content_type = 'application/octet-stream'
# main_type, sub_type = content_type.split('/', 1)
# if main_type == 'text':
# fp = open(file, 'rb')
# msg = MIMEText(fp.read(), _subtype=sub_type)
# fp.close()
# elif main_type == 'image':
# fp = open(file, 'rb')
# msg = MIMEImage(fp.read(), _subtype=sub_type)
# fp.close()
# elif main_type == 'audio':
# fp = open(file, 'rb')
# msg = MIMEAudio(fp.read(), _subtype=sub_type)
# fp.close()
# else:
# fp = open(file, 'rb')
# msg = MIMEBase(main_type, sub_type)
# msg.set_payload(fp.read())
# fp.close()
# filename = os.path.basename(file)
# msg.add_header('Content-Disposition', 'attachment', filename=filename)
# message.attach(msg)
#
# return {'raw': base64.urlsafe_b64encode(message.as_string())}
#
#
#
# def get_service():
# from googleapiclient.discovery import build
# from httplib2 import Http
# from oauth2client import file, client, tools
# """Shows basic usage of the Gmail API.
# Lists the user's Gmail labels.
# """
# # The file token.json stores the user's access and refresh tokens, and is
# # created automatically when the authorization flow completes for the first
# # time.
# store = file.Storage('token.json')
# creds = store.get()
# if not creds or creds.invalid:
# flow = client.flow_from_clientsecrets('./credentials.json', SCOPES)
# creds = tools.run_flow(flow, store)
# return build('gmail', 'v1', http=creds.authorize(Http()))
#
# def send_gmsg(to, subject, message_text, file=None):
# settings = get_settings()
# sender = 'center.email' in settings and settings['center.email'] or None
#
# message = create_message(sender, to, subject, message_text, file=None)
# service = get_service()
# try:
# message = (service.users().messages().send(userId=user_id, body=message)
# .execute())
# return message
# except errors.HttpError as error:
# pass
#
# http://python.web.id/angka-terbilang-pada-python/#.VQpS8s2sXQo
satuan
=
[
''
,
'satu'
,
'dua'
,
'tiga'
,
'empat'
,
'lima'
,
'enam'
,
'tujuh'
,
'delapan'
,
'sembilan'
,
'sepuluh'
,
'sebelas'
]
def
terbilang_
(
n
):
n
=
int
(
n
)
if
n
>=
1000000000
:
hasil
=
terbilang_
(
n
/
1000000000
)
+
[
'milyar'
]
+
terbilang_
(
n
%
100000000
)
elif
n
>
1000000
:
hasil
=
terbilang_
(
n
/
1000000
)
+
[
'juta'
]
+
terbilang_
(
n
%
1000000
)
elif
n
>=
2000
:
hasil
=
terbilang_
(
n
/
1000
)
+
[
'ribu'
]
+
terbilang_
(
n
%
1000
)
elif
n
>=
1000
:
hasil
=
[
'seribu'
]
+
terbilang_
(
n
-
1000
)
elif
n
>=
200
:
hasil
=
terbilang_
(
n
/
100
)
+
[
'ratus'
]
+
terbilang_
(
n
%
100
)
elif
n
>=
100
:
hasil
=
[
'seratus'
]
+
terbilang_
(
n
-
100
)
elif
n
>=
20
:
hasil
=
terbilang_
(
n
/
10
)
+
[
'puluh'
]
+
terbilang_
(
n
%
10
)
elif
n
>=
12
:
hasil
=
terbilang_
(
n
%
10
)
+
[
'belas'
]
else
:
hasil
=
[
satuan
[
n
]]
print
(
'hasil'
)
return
hasil
def
terbilang
(
n
):
if
n
==
0
:
return
'nol'
t
=
terbilang_
(
n
)
while
''
in
t
:
t
.
remove
(
''
)
return
' '
.
join
(
t
)
def
number_only
(
value
):
return
re
.
sub
(
'
\
D'
,
""
,
value
)
def
set_user_log
(
message
,
request
,
logobj
=
None
,
user_name
=
None
):
if
not
logobj
:
logobj
=
log
if
not
request
:
logobj
.
info
(
"Request not Set"
)
return
if
not
user_name
:
user_name
=
request
and
request
.
user
and
request
.
user
.
user_name
or
None
if
not
user_name
:
logobj
.
info
(
"User Name not Set"
)
return
addr
=
request
.
client_addr
message
=
"User {} at Addr {}: {}"
.
format
(
user_name
,
addr
,
message
)
logobj
.
warning
(
message
)
def
create_static_path
(
config
,
folder
,
url
,
cache_max_age
=
3600
):
log
.
debug
(
"Folder to create:
%
s"
,
folder
)
if
not
os
.
path
.
exists
(
folder
):
try
:
os
.
makedirs
(
folder
)
except
Exception
as
e
:
log
.
debug
(
"User:
%
s"
,
os
.
path
.
expanduser
(
"~"
))
log
.
debug
(
"Error:
%
s"
,
str
(
e
))
config
.
add_static_view
(
url
,
path
=
folder
,
cache_max_age
=
cache_max_age
)
def
includeme
(
config
):
config
.
add_translation_dirs
(
'opensipkd.tools:locale/'
)
opensipkd/tools/tools.init
0 → 100644
View file @
b0a3cd2
from __future__ import print_function
import os
import re
import mimetypes
import csv
import calendar
import datetime
from random import choice
from string import (ascii_uppercase, ascii_lowercase, digits, )
import locale
import colander
import pytz
import io
from pyramid.httpexceptions import HTTPNotFound
from pyramid.threadlocal import get_current_registry
from json import JSONEncoder
import logging
log = logging.getLogger(__name__)
################
# Phone number #
################
MSISDN_ALLOW_CHARS = map(lambda x: str(x), range(10)) # + ['+']
def get_msisdn(msisdn, country='+62'):
"""
Digunakan untuk pengecekan no telp suatu negara
:param msisdn:
:param country:
:return: output kode negara+no msisdn
"""
# for ch in msisdn:
# if ch not in MSISDN_ALLOW_CHARS:
# raise Exception(f'{ch} not in {print(x for x in range(10))}')
try:
i = int(msisdn)
except ValueError as e:
log.info(e)
return
if not i:
raise Exception('MSISDN must filled')
if len(str(i)) < 7:
raise Exception('MSISDN less then 7 number')
if re.compile(r'^\+').search(msisdn):
return msisdn
if re.compile(r'^0').search(msisdn):
return '%s%s' % (country, msisdn.lstrip('0'))
################
# Money format #
################
def should_int(value):
int_ = int(value)
if int_ == value:
return int_
return value
def thousand(value, float_count=None):
"""
Memisahkan ribuan
:param value:
:param float_count:
:return: string
"""
if not value:
return "0"
if float_count is None: # autodetection
if type(value) in (int, float):
float_count = 0
else:
float_count = 2
return locale.format_string('%%.%df' % float_count, value, True)
def money(value, float_count=None, currency=None):
"""
Memisahkan ribuan dan menambahkan nama mata uang
:param value:
:param float_count:
:param currency:
:return:
"""
if value < 0:
v = abs(value)
format_ = '(%s)'
else:
v = value
format_ = '%s'
if currency is None:
currency = locale.localeconv()['currency_symbol']
s = ' '.join([currency, thousand(v, float_count)])
return format_ % s
def round_up(n):
i = int(n)
if n == i:
return i
if n > i:
return i + 1
return i - 1
###########
# Pyramid #
###########
def get_settings():
return get_current_registry().settings
def get_params(params, alternate=None):
"""
Digunakan untuk mengambil nilai dari konfigurasi sesuai params yang disebut
:param params: variable
:param alternate: default apabila tidak ditemukan data/params
:return: value
contoh penggunaan:
get_params('devel', False)
"""
settings = get_settings()
result = settings and params in settings and settings[
params].strip() or None
return result and result or alternate
def devel():
settings = get_settings()
is_devel = 'devel' in settings and settings['devel'] and True or False
print("DEBUG>>", is_devel)
return is_devel
def get_timezone():
settings = get_settings()
return pytz.timezone(settings['timezone'])
def get_modules():
settings = get_settings()
return settings['modules'].split(',')
########
# Time #
########
one_second = datetime.timedelta(1.0 / 24 / 60 / 60)
DateType = type(datetime.date.today())
DateTimeType = type(datetime.datetime.now())
TimeZoneFile = '/etc/timezone'
if os.path.exists(TimeZoneFile):
DefaultTimeZone = open(TimeZoneFile).read().strip()
else:
DefaultTimeZone = 'Asia/Jakarta'
def as_timezone(tz_date):
localtz = get_timezone()
if not tz_date.tzinfo:
tz_date = create_datetime(tz_date.year, tz_date.month, tz_date.day,
tz_date.hour, tz_date.minute, tz_date.second,
tz_date.microsecond)
return tz_date.astimezone(localtz)
def create_datetime(year, month, day, hour=0, minute=7, second=0,
microsecond=0):
tz = get_timezone()
dt = datetime.datetime(year, month, day, hour, minute, second, microsecond)
return tz.localize(dt)
def create_date(year, month, day):
return create_datetime(year, month, day)
def create_now():
tz = get_timezone()
return datetime.datetime.now(tz)
def date_from_str(value):
separator = None
value = value.split()[0] # dd-mm-yyyy HH:MM:SS
for s in ['-', '/', '.']:
if value.find(s) > -1:
separator = s
break
if separator:
t = list(map(lambda x: int(x), value.split(separator)))
y, m, d = t[2], t[1], t[0]
if d > 999: # yyyy-mm-dd
y, d = d, y
else:
y, m, d = int(value[:4]), int(value[4:6]), int(value[6:])
return datetime.date(y, m, d)
def time_from_str(value):
# separator = ":"
value = value.split()[1] # dd-mm-yyyy HH:MM:SS
# return value.strptime('%H:%M:%S')
h, m, s = value.split(":")
# return datetime.time(h, m, s)
return datetime.timedelta(hours=int(h), minutes=int(m), seconds=int(s))
def datetime_from_str(value):
# separator = None
dt = date_from_str(value)
tm = time_from_str(value)
return datetime.datetime(dt.year, dt.month, dt.day)+tm
def dmy(tgl):
return tgl.strftime('%d-%m-%Y')
def hms(tgl):
return tgl.strftime('%H:%M:%S')
def dmy_to_date(tgl):
return datetime.datetime.strptime(tgl, '%d-%m-%Y')
def dmyhms(tgl):
return tgl.strftime('%d-%m-%Y %H:%M:%S')
def ymd(tgl):
return tgl.strftime('%Y-%m-%d')
def ymdhms(tgl):
return tgl.strftime('%Y-%m-%d %H:%M:%S')
def dMy(tgl):
return str(tgl.day) + ' ' + NAMA_BULAN[tgl.month][1] + ' ' + str(tgl.year)
def tampil_bulan(bulan):
return NAMA_BULAN[bulan][1]
def next_month(year, month):
if month == 12:
month = 1
year += 1
else:
month += 1
return year, month
def best_date(year, month, day):
try:
return datetime.date(year, month, day)
except ValueError:
last_day = calendar.monthrange(year, month)[1]
return datetime.date(year, month, last_day)
def next_month_day(year, month, day):
year, month = next_month(year, month)
return best_date(year, month, day)
def count_day(date):
date = str(date) # 2018-01-02 list
year = int(date[:4])
if date[5] == '0':
month = int(date[6])
else:
month = int(date[5:7])
count_day = calendar.monthrange(year, month)
return int(count_day[1])
NAMA_BULAN = (
(0, "--Bulan--"),
(1, "Januari", "Jan"),
(2, "Februari", "Feb"),
(3, "Maret", "Mar"),
(4, "April", "Apr"),
(5, "Mei", "Mei"),
(6, "Juni", "Jun"),
(7, "Juli", "Jul"),
(8, "Agustus", "Agu"),
(9, "September", "Sep"),
(10, "Oktober", "Okt"),
(11, "Nopember", "Nov"),
(12, "Desember", "Des"),
)
##########
# String #
##########
def one_space(s):
s = s.strip()
while s.find(' ') > -1:
s = s.replace(' ', ' ')
return s
def to_str(v):
typ = type(v)
if typ == DateType:
return dmy(v)
if typ == DateTimeType:
return dmyhms(v)
if v == 0:
return '0'
if typ == str:
return v.strip()
elif typ == bool:
return v and '1' or '0'
return v and str(v) or ''
def dict_to_str(d):
r = {}
for key in d:
val = d[key]
r[key] = to_str(val)
return r
def split(s, c=4):
r = []
while s:
t = s[:c]
r.append(t)
s = s[c:]
return ' '.join(r)
def url_join(*args):
url = ''
for arg in args:
if arg:
aurl = arg.strip('/')
url = url + aurl + '/'
url = url.rstrip('/')
return url
def upper_dict(d):
new_dict = dict((str(k).upper(), v) for k, v in d.items())
return new_dict
########
# File #
########
# http://stackoverflow.com/questions/2257441/random-string-generation-with-upper-case-letters-and-digits-in-python
def get_random_string(width=6):
return ''.join(choice(ascii_uppercase + ascii_lowercase + digits) \
for _ in range(width))
def get_random_number(width=12):
return ''.join(choice(digits) \
for _ in range(width))
def get_ext(filename):
return os.path.splitext(filename)[-1]
def get_filename(filename):
return "".join(os.path.splitext(filename)[:-1])
def file_type(filename):
ctype, encoding = mimetypes.guess_type(filename)
if ctype is None or encoding is not None:
ctype = 'application/octet-stream'
return ctype
class SaveFile(object):
def __init__(self, dir_path):
self.dir_path = dir_path
if not os.path.exists(dir_path):
os.makedirs(dir_path)
# Unchanged file extension, and make file prefix unique with sequential
# number.
def create_fullpath(self, ext=''):
while True:
filename = get_random_string() + ext
full_path = os.path.join(self.dir_path, filename)
if not os.path.exists(full_path):
return full_path
def save(self, content, ext='', filename=None):
if filename:
fullpath = os.path.join(self.dir_path, filename)
else:
fullpath = self.create_fullpath(ext=ext)
f = open(fullpath, 'wb')
if type(content) == io.BytesIO:
f.write(content.getbuffer())
else:
f.write(content)
f.close()
return fullpath
class InvalidExtension(Exception):
def __init__(self, exts):
self.error = f"Supported extensions {exts}"
class MemoryTmpStore(dict):
""" Instances of this class implement the
:class:`defWorm.interfaces.FileUploadTempStore` interface"""
def preview_url(self, uid):
return None
mem_tmp_store = MemoryTmpStore()
img_exts = ['.png', '.jpg', '.pdf', '.jpeg']
def image_validator(node, value):
ext = get_ext(value["filename"])
if ext not in img_exts:
raise colander.Invalid(node,
f'Extension harus salahsatu dari {img_exts}')
def file_response(request, f, filename, type):
response = request.response
response.content_type = str(type)
response.content_disposition = 'filename=' + filename
response.write(f.read())
return response
class Upload(SaveFile):
def save_to_file(self, input_file, ext, filename=None):
if filename:
fullpath = os.path.join(self.dir_path, filename, ext)
else:
fullpath = self.create_fullpath(ext)
output_file = open(fullpath, 'wb')
input_file.seek(0)
while True:
data = input_file.read(2 << 16)
if not data:
break
output_file.write(data)
output_file.close()
return fullpath
def save(self, request, name, exts=None, filename=None):
input_file = request.POST[name].file
ext = get_ext(request.POST[name].filename)
if exts and ext not in exts:
raise InvalidExtension(exts)
filename = self.save_to_file(input_file, ext, filename=filename)
head, filename = os.path.split(filename)
return filename
def save_fp(self, upload):
if 'fp' not in upload or upload['fp'] == b'':
if "filename" in upload:
return upload['filename']
return
filename = upload['filename']
input_file = upload['fp']
ext = get_ext(filename)
return self.save_to_file(input_file, ext)
def saves(self, uploads):
d = {}
for upload in uploads:
if 'fp' not in upload or upload['fp'] == b'':
continue
filename = upload['filename']
# input_file = upload['fp']
# ext = get_ext(filename)
# d[filename] = self.save_to_file(input_file, ext)
d[filename] = self.save_fp(upload)
return d
def download(self, request, filename):
out_filename = os.path.join(self.dir_path, filename)
if not os.path.exists(out_filename):
raise HTTPNotFound
ext = get_ext(filename)[1:]
if ext in ['gif', 'png', 'jpeg', 'jpg']:
ext = 'image/' + ext
with open(out_filename, 'rb') as f:
return file_response(request, f, filename, ext)
class UploadBin(Upload):
"""
Compatibility to previous
"""
pass
class CSVRenderer(object):
def __init__(self, info):
pass
def __call__(self, value, system):
""" Returns a plain CSV-encoded string with content-type
``text/csv``. The content-type may be overridden by
setting ``request.response.content_type``."""
request = system.get('request')
if request is not None:
response = request.response
ct = response.content_type
if ct == response.default_content_type:
response.content_type = 'text/csv'
# error ketika open csv menggunakan bytes
# karena tak mau hapus, maka dibikin try
try:
fout = io.BytesIO() #
fcsv = csv.writer(fout, delimiter=',', quotechar='"',
quoting=csv.QUOTE_MINIMAL)
fcsv.writerow(value.get('header', []))
fcsv.writerows(value.get('rows', []))
except:
fout = io.StringIO()
fcsv = csv.writer(fout, delimiter=',', quotechar='"',
quoting=csv.QUOTE_MINIMAL)
fcsv.writerow(value.get('header', []))
fcsv.writerows(value.get('rows', []))
return fout.getvalue()
# # Data Table Function
# def _DTstrftime(chain):
# ret = chain and datetime.strftime(chain, '%d-%m-%Y')
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTnumber_format(chain):
# import locale
# locale.setlocale(locale.LC_ALL, 'id_ID.utf8')
# ret = locale.format("%d", chain, grouping=True)
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTupper(chain):
# ret = chain.upper()
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTstatus(chain):
# if chain:
# ret = 'Aktif'
# else:
# ret = 'Mati'
# if ret:
# return ret
# else:
# return chain
##########
# String #
##########
def clean(s):
r = ''
for ch in s:
asc = ord(ch)
if asc > 126 or asc < 32:
ch = ' '
r += ch
return r
# def to_str(s):
# s = s or ''
# s = str(s)
# return clean(s)
def left(s, width):
s = to_str(s)
return s.ljust(width)[:width]
def right(s, width):
s = to_str(s)
return s.zfill(width)[:width]
##################
# Data Structure #
##################
class FixLength(object):
def __init__(self, struct):
self.struct = self.fields = None
self.set_struct(struct)
def set_struct(self, struct):
self.struct = struct
self.fields = {}
for s in struct:
name = s[0]
size = s[1:] and s[1] or 1
typ = s[2:] and s[2] or 'A' # N: numeric, A: alphanumeric
self.fields[name] = {'value': None, 'type': typ, 'size': size}
def set(self, name, value):
self.fields[name]['value'] = value
def get(self, name):
v = self.fields[name]['value']
if v:
return v.strip()
return ''
def __setitem__(self, name, value):
self.set(name, value)
def __getitem__(self, name):
return self.get(name)
def get_raw(self):
return self.get_spliter("")
def get_raw_dotted(self):
return self.get_dotted()
def get_value(self, name, typ):
v = self.fields[name]['value']
if v and typ == 'N':
i = int(v)
if v == i:
v = i
return v
def get_spliter(self, ch):
"""
2022-09-12 Perbaikan return jika ch is none
"""
s = ''
for name, size, typ in self.struct:
v = self.get_value(name, typ)
pad_func = typ == 'N' and right or left
s += pad_func(v, size)
s += ch
if ch:
return s[:-1]
return s
def get_dotted(self):
return self.get_spliter('.')
def set_raw(self, raw):
awal = 0
for t in self.struct:
name = t[0]
size = t[1:] and t[1] or 1
akhir = awal + size
value = raw[awal:akhir]
if not value:
return
self.set(name, value)
awal += size
return True
def from_dict(self, d):
for name in self.fields:
if name in d and d[name]:
value = d[name]
self.set(name, value)
def to_dict(self):
r = {}
for name in self.fields:
r[name] = self.get(name)
return r
def length(self):
return len(self.get_raw())
import decimal
class Encoder(JSONEncoder):
"""Extends JSONEncoder for date and decimal types."""
def push_date(self, d):
"""Serialize the given datetime.date object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
return "%04d-%02d-%02d" % (d.year, d.month, d.day)
def push_timedelta(self, t):
"""Serialize the given datetime.timedelta object to a JSON string."""
days = t.days
if days < 0:
minus = "-"
days = -days - 1
seconds = 24 * 60 * 60 - t.seconds
else:
minus = ""
seconds = t.seconds
secs = seconds % 60
seconds /= 60
mins = seconds % 60
hours = seconds / 60
return "%s%d:%02d:%02d:%02d" % (minus, days, hours, mins, secs)
def push_time(self, t):
"""Serialize the given datetime.time object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
return "%02d:%02d:%02d" % (t.hour, t.minute, t.second)
def push_datetime(self, dt):
"""Serialize the given datetime.datetime object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
# Don't use strftime because that can't handle dates before 1900.
return ("%04d-%02d-%02dT%02d:%02d:%02d" %
(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second))
def default(self, o):
# We MUST check for a datetime.datetime instance before datetime.date.
# datetime.datetime is a subclass of datetime.date, and therefore
# instances of it are also instances of datetime.date.
if isinstance(o, datetime.datetime):
return self.push_datetime(o)
elif isinstance(o, datetime.date):
return self.push_date(o)
elif isinstance(o, datetime.timedelta):
return self.push_timedelta(o)
elif isinstance(o, datetime.time):
return self.push_time(o)
elif isinstance(o, decimal.Decimal):
return str(o)
else:
return JSONEncoder.default(self, o)
STATUS = (
(1, 'Aktif'),
(0, 'Pasif')
)
INVOICE_STATUS = (
(0, 'Draft'),
(1, 'Approved'),
)
def pbb_biller_split():
settings = get_settings()
if not 'pbb_biller' in settings:
return None, None
biller = re.sub('\D', '', settings['pbb_biller'])
return biller[:2], biller[2:]
def clear_null_value(values):
# digunakan untuk menghapus dictionary yang value nya null
tobe_del = []
for value in values:
if not values[value]:
tobe_del.append(value)
for value in tobe_del:
del values[value]
return values
def replace_char(text, start, stop, char="X"):
x = char * (stop - start + 1)
s = text[:start] + x + text[stop:]
# print "*** DEBUG ", text, s
return s
##################
# Email #
##################
# import smtplib
# from email.MIMEMultipart import MIMEMultipart
# from email.MIMEText import MIMEText
# from email.MIMEBase import MIMEBase
# import base64
SCOPES = [
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/userinfo.profile',
# Add other requested scopes.
]
# def send_email(to_addr_list, subject, message, cc_addr_list=None,from_addr=None,
# login=None, password=None, file=None,
# smtp_server='smtp.gmail.com:587'):
# settings = get_settings()
# if settings['devel']=='true':
# return
#
# if not login:
#
# login = 'center.email' in settings and settings['center.email'] or None
# # if login[-9:]=="gmail.com":
# # return send_gmsg(to=to_addr_list, subject=subject, message_text=message, file=file,)
#
# if not password:
# password = 'center.email_password' in settings and settings['center.email_password'] or None
# if not login or not password:
# return 'Error smtp login or Password'
#
# if 'center.smtp_server' in settings and settings['center.smtp_server']:
# smtp_server = settings['center.smtp_server']
#
# if not from_addr:
# from_addr = login
#
# header = 'From: %s\r\n' % from_addr
# header += 'To: %s \r\n' % ','.join(to_addr_list)
# if cc_addr_list:
# header += 'Cc: %s \r\n' % ','.join(cc_addr_list)
# header += 'Subject: %s \r\n' % subject
# message = header + message
# smtp, port = smtp_server.split(':')
# port = int(port)
#
# #print '>>>>>>>>>>>>', smtp, port, login,password
# try:
# #print smtp_server, smtp, port
# server = smtplib.SMTP(smtp,port) #smtp_server)
# server.ehlo()
# if port==587:
# server.starttls()
#
# server.login(login,password)
# problems = server.sendmail(from_addr, to_addr_list, message)
# server.quit()
# return
# except Exception as e:
# return str(e)
#
# def create_message(sender, to, subject, message_text, file=None):
# """Create a message for an email.
#
# Args:
# sender: Email address of the sender.
# to: Email address of the receiver.
# subject: The subject of the email message.
# message_text: The text of the email message.
# file: The path to the file to be attached.
#
# Returns:
# An object containing a base64url encoded email object.
# """
# if file:
# message = MIMEMultipart()
# msg = MIMEText(message_text)
# else:
# message = MIMEText(message_text)
#
# message['to'] = to
# message['from'] = sender
# message['subject'] = subject
#
# if file:
# message.attach(msg)
# content_type, encoding = mimetypes.guess_type(file)
#
# if content_type is None or encoding is not None:
# content_type = 'application/octet-stream'
# main_type, sub_type = content_type.split('/', 1)
# if main_type == 'text':
# fp = open(file, 'rb')
# msg = MIMEText(fp.read(), _subtype=sub_type)
# fp.close()
# elif main_type == 'image':
# fp = open(file, 'rb')
# msg = MIMEImage(fp.read(), _subtype=sub_type)
# fp.close()
# elif main_type == 'audio':
# fp = open(file, 'rb')
# msg = MIMEAudio(fp.read(), _subtype=sub_type)
# fp.close()
# else:
# fp = open(file, 'rb')
# msg = MIMEBase(main_type, sub_type)
# msg.set_payload(fp.read())
# fp.close()
# filename = os.path.basename(file)
# msg.add_header('Content-Disposition', 'attachment', filename=filename)
# message.attach(msg)
#
# return {'raw': base64.urlsafe_b64encode(message.as_string())}
#
#
#
# def get_service():
# from googleapiclient.discovery import build
# from httplib2 import Http
# from oauth2client import file, client, tools
# """Shows basic usage of the Gmail API.
# Lists the user's Gmail labels.
# """
# # The file token.json stores the user's access and refresh tokens, and is
# # created automatically when the authorization flow completes for the first
# # time.
# store = file.Storage('token.json')
# creds = store.get()
# if not creds or creds.invalid:
# flow = client.flow_from_clientsecrets('./credentials.json', SCOPES)
# creds = tools.run_flow(flow, store)
# return build('gmail', 'v1', http=creds.authorize(Http()))
#
# def send_gmsg(to, subject, message_text, file=None):
# settings = get_settings()
# sender = 'center.email' in settings and settings['center.email'] or None
#
# message = create_message(sender, to, subject, message_text, file=None)
# service = get_service()
# try:
# message = (service.users().messages().send(userId=user_id, body=message)
# .execute())
# return message
# except errors.HttpError as error:
# pass
#
# http://python.web.id/angka-terbilang-pada-python/#.VQpS8s2sXQo
satuan = ['', 'satu', 'dua', 'tiga', 'empat', 'lima', 'enam', 'tujuh',
'delapan', 'sembilan', 'sepuluh', 'sebelas']
def terbilang_(n):
n = int(n)
if n >= 1000000000:
hasil = terbilang_(n / 1000000000) + ['milyar'] + terbilang_(
n % 100000000)
elif n >= 1000000: #commit by taufik
hasil = terbilang_(n / 1000000) + ['juta'] + terbilang_(n % 1000000)
elif n >= 2000:
hasil = terbilang_(n / 1000) + ['ribu'] + terbilang_(n % 1000)
elif n >= 1000:
hasil = ['seribu'] + terbilang_(n - 1000)
elif n >= 200:
hasil = terbilang_(n / 100) + ['ratus'] + terbilang_(n % 100)
elif n >= 100:
hasil = ['seratus'] + terbilang_(n - 100)
elif n >= 20:
hasil = terbilang_(n / 10) + ['puluh'] + terbilang_(n % 10)
elif n >= 12:
hasil = terbilang_(n % 10) + ['belas']
else:
hasil = [satuan[n]]
print('hasil')
return hasil
def terbilang(n):
if n == 0:
return 'nol'
t = terbilang_(n)
while '' in t:
t.remove('')
return ' '.join(t)
def number_only(value):
return re.sub('\D', "", value)
def set_user_log(message, request, logobj=None, user_name=None):
if not logobj:
logobj = log
if not request:
logobj.info("Request not Set")
return
if not user_name:
user_name = request and request.user and request.user.user_name or None
if not user_name:
logobj.info("User Name not Set")
return
addr = request.client_addr
message = "User {} at Addr {}: {}".format(user_name, addr, message)
logobj.warning(message)
def create_static_path(config, folder, url, cache_max_age=3600):
log.debug("Folder to create: %s", folder)
if not os.path.exists(folder):
try:
os.makedirs(folder)
except Exception as e:
log.debug("User: %s", os.path.expanduser("~"))
log.debug("Error: %s", str(e))
config.add_static_view(url, path=folder, cache_max_age=cache_max_age)
def includeme(config):
config.add_translation_dirs('opensipkd.tools:locale/')
>>>>>>> ac4770c010153a01ecbc3f6c0fa8b3dff80e62df
Write
Preview
Markdown
is supported
Attach a file
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to post a comment