Commit 00991536 by aa.gusti

init

0 parents
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
env2/
env_*/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
inventori/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# dotenv
.env
# bat file
*.bat
# virtualenv
.venv
venv/
ENV/
# Spyder project settings
.spyderproject
# Rope project settings
.ropeproject
*.ini
.idea/
.project/
.DS_Store
opensipkd/base/static/img/*
!opensipkd/base/static/img/pyramid.png
!opensipkd/base/static/img/opensipkd.png
!opensipkd/base/static/img/opensipkd_bg.png
!opensipkd/base/static/img/ajax-loader.gif
!opensipkd/base/static/img/line.png
alembic.ini
alembic_local.ini
0.0.1 20-09-2021
Pertaman kali dibuat/dipisahkan
File mode changed
Tools Aplikasi openSIPKD
========================
Ini adalah tools yang sering digunakan dalam development openSIPKD.
Pemasangan
----------
Pasang paket Debian yang dibutuhkan::
$ sudo apt install libqpdf-dev
Buat Python Virtual Environment, biasanya pada home directory::
$ python3 -m venv ~/env
$ ~/env/bin/pip install --upgrade pip setuptools
Install Production
------------------
$ ~/env/bin/pip install git+https://git.opensipkd.com/aa.gusti/tools.git
Install Development::
-------------------
$ mkdir apps
$ cd apps
$ git clone https://git.opensipkd.com/aa.gusti/tools.git
$ env/bin/pip install -e tools
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
\ No newline at end of file
from __future__ import print_function
import os
import re
import mimetypes
import csv
import calendar
# from datetime import (date, datetime, timedelta, )
import datetime
from random import choice
from string import (ascii_uppercase, ascii_lowercase, digits, )
import locale
import pytz
import io
from pyramid.threadlocal import get_current_registry
# from calendar import monthrange
from json import JSONEncoder
import logging
log = logging.getLogger(__name__)
################
# Phone number #
################
MSISDN_ALLOW_CHARS = map(lambda x: str(x), range(10)) # + ['+']
def get_msisdn(msisdn, country='+62'):
"""
Digunakan untuk pengecekan no telp suatu negara
:param msisdn:
:param country:
:return: output kode negara+no msisdn
"""
# for ch in msisdn:
# if ch not in MSISDN_ALLOW_CHARS:
# raise Exception(f'{ch} not in {print(x for x in range(10))}')
try:
i = int(msisdn)
except ValueError as e:
log.info(e)
return
if not i:
raise Exception('MSISDN must filled')
if len(str(i)) < 7:
raise Exception('MSISDN less then 7 number')
if re.compile(r'^\+').search(msisdn):
return msisdn
if re.compile(r'^0').search(msisdn):
return '%s%s' % (country, msisdn.lstrip('0'))
################
# Money format #
################
def should_int(value):
int_ = int(value)
if int_ == value:
return int_
return value
def thousand(value, float_count=None):
"""
Memisahkan ribuan
:param value:
:param float_count:
:return: string
"""
if not value:
return "0"
if float_count is None: # autodetection
if type(value) in (int, float):
float_count = 0
else:
float_count = 2
return locale.format_string('%%.%df' % float_count, value, True)
def money(value, float_count=None, currency=None):
"""
Memisahkan ribuan dan menambahkan nama mata uang
:param value:
:param float_count:
:param currency:
:return:
"""
if value < 0:
v = abs(value)
format_ = '(%s)'
else:
v = value
format_ = '%s'
if currency is None:
currency = locale.localeconv()['currency_symbol']
s = ' '.join([currency, thousand(v, float_count)])
return format_ % s
def round_up(n):
i = int(n)
if n == i:
return i
if n > i:
return i + 1
return i - 1
###########
# Pyramid #
###########
def get_settings():
return get_current_registry().settings
def devel():
settings = get_settings()
is_devel = 'devel' in settings and settings['devel'] and True or False
print("DEBUG>>", is_devel)
return is_devel
def get_timezone():
settings = get_settings()
return pytz.timezone(settings['timezone'])
def get_modules():
settings = get_settings()
return settings['modules'].split(',')
########
# Time #
########
one_second = datetime.timedelta(1.0 / 24 / 60 / 60)
DateType = type(datetime.date.today())
DateTimeType = type(datetime.datetime.now())
TimeZoneFile = '/etc/timezone'
if os.path.exists(TimeZoneFile):
DefaultTimeZone = open(TimeZoneFile).read().strip()
else:
DefaultTimeZone = 'Asia/Jakarta'
def as_timezone(tz_date):
localtz = get_timezone()
if not tz_date.tzinfo:
tz_date = create_datetime(tz_date.year, tz_date.month, tz_date.day,
tz_date.hour, tz_date.minute, tz_date.second,
tz_date.microsecond)
return tz_date.astimezone(localtz)
def create_datetime(year, month, day, hour=0, minute=7, second=0,
microsecond=0):
tz = get_timezone()
dt = datetime.datetime(year, month, day, hour, minute, second, microsecond)
return tz.localize(dt)
def create_date(year, month, day):
return create_datetime(year, month, day)
def create_now():
tz = get_timezone()
return datetime.datetime.now(tz)
def date_from_str(value):
separator = None
value = value.split()[0] # dd-mm-yyyy HH:MM:SS
for s in ['-', '/', '.']:
if value.find(s) > -1:
separator = s
break
if separator:
t = list(map(lambda x: int(x), value.split(separator)))
y, m, d = t[2], t[1], t[0]
if d > 999: # yyyy-mm-dd
y, d = d, y
else:
y, m, d = int(value[:4]), int(value[4:6]), int(value[6:])
return datetime.date(y, m, d)
def time_from_str(value):
# separator = ":"
value = value.split()[1] # dd-mm-yyyy HH:MM:SS
return value.strptime('%H:%M:%S')
def datetime_from_str(value):
# separator = None
dt = date_from_str(value)
tm = time_from_str(value)
return dt + tm
# value = value.split()[0] # dd-mm-yyyy HH:MM:SS
# for s in ['-', '/', '.']:
# if value.find(s) > -1:
# separator = s
# break
# if separator:
# t = list(map(lambda x: int(x), value.split(separator)))
# y, m, d = t[2], t[1], t[0]
# if d > 999: # yyyy-mm-dd
# y, d = d, y
# else:
# y, m, d = int(value[:4]), int(value[4:6]), int(value[6:])
# return datetime.date(y, m, d)
def dmy(tgl):
return tgl.strftime('%d-%m-%Y')
def hms(tgl):
return tgl.strftime('%H:%M:%S')
def dmy_to_date(tgl):
return datetime.datetime.strptime(tgl, '%d-%m-%Y')
def dmyhms(tgl):
return tgl.strftime('%d-%m-%Y %H:%M:%S')
def ymd(tgl):
return tgl.strftime('%Y-%m-%d')
def ymdhms(tgl):
return tgl.strftime('%Y-%m-%d %H:%M:%S')
def dMy(tgl):
return str(tgl.day) + ' ' + NAMA_BULAN[tgl.month][1] + ' ' + str(tgl.year)
def tampil_bulan(bulan):
return NAMA_BULAN[bulan][1]
def next_month(year, month):
if month == 12:
month = 1
year += 1
else:
month += 1
return year, month
def best_date(year, month, day):
try:
return datetime.date(year, month, day)
except ValueError:
last_day = calendar.monthrange(year, month)[1]
return datetime.date(year, month, last_day)
def next_month_day(year, month, day):
year, month = next_month(year, month)
return best_date(year, month, day)
def count_day(date):
date = str(date) # 2018-01-02 list
year = int(date[:4])
if date[5] == '0':
month = int(date[6])
else:
month = int(date[5:7])
count_day = calendar.monthrange(year, month)
return int(count_day[1])
NAMA_BULAN = (
(0, "--Bulan--"),
(1, "Januari", "Jan"),
(2, "Februari", "Feb"),
(3, "Maret", "Mar"),
(4, "April", "Apr"),
(5, "Mei", "Mei"),
(6, "Juni", "Jun"),
(7, "Juli", "Jul"),
(8, "Agustus", "Agu"),
(9, "September", "Sep"),
(10, "Oktober", "Okt"),
(11, "Nopember", "Nov"),
(12, "Desember", "Des"),
)
##########
# String #
##########
def one_space(s):
s = s.strip()
while s.find(' ') > -1:
s = s.replace(' ', ' ')
return s
def to_str(v):
typ = type(v)
if typ == DateType:
return dmy(v)
if typ == DateTimeType:
return dmyhms(v)
if v == 0:
return '0'
if typ == str:
return v.strip()
elif typ == bool:
return v and '1' or '0'
return v and str(v) or ''
def dict_to_str(d):
r = {}
for key in d:
val = d[key]
r[key] = to_str(val)
return r
def split(s, c=4):
r = []
while s:
t = s[:c]
r.append(t)
s = s[c:]
return ' '.join(r)
def url_join(*args):
url = ''
for arg in args:
if arg:
aurl = arg.strip('/')
url = url + aurl + '/'
url = url.rstrip('/')
return url
def upper_dict(d):
new_dict = dict((str(k).upper(), v) for k, v in d.items())
return new_dict
########
# File #
########
# http://stackoverflow.com/questions/2257441/random-string-generation-with-upper-case-letters-and-digits-in-python
def get_random_string(width=6):
return ''.join(choice(ascii_uppercase + ascii_lowercase + digits) \
for _ in range(width))
def get_random_number(width=12):
return ''.join(choice(digits) \
for _ in range(width))
def get_ext(filename):
return os.path.splitext(filename)[-1]
def get_filename(filename):
return "".join(os.path.splitext(filename)[:-1])
def file_type(filename):
ctype, encoding = mimetypes.guess_type(filename)
if ctype is None or encoding is not None:
ctype = 'application/octet-stream'
return ctype
class SaveFile(object):
def __init__(self, dir_path):
self.dir_path = dir_path
# Unchanged file extension, and make file prefix unique with sequential
# number.
def create_fullpath(self, ext=''):
while True:
filename = get_random_string() + ext
full_path = os.path.join(self.dir_path, filename)
if not os.path.exists(full_path):
return full_path
def save(self, content, filename=None):
fullpath = self.create_fullpath()
f = open(fullpath, 'wb')
f.write(content)
f.close()
return fullpath
class Upload(SaveFile):
def save_to_file(self, input_file, ext):
fullpath = self.create_fullpath(ext)
output_file = open(fullpath, 'wb')
input_file.seek(0)
while True:
data = input_file.read(2 << 16)
if not data:
break
output_file.write(data)
output_file.close()
return fullpath
def save(self, request, name):
input_file = request.POST[name].file
ext = get_ext(request.POST[name].filename)
self.save_to_file(input_file, ext)
def saves(self, uploads):
d = {}
for upload in uploads:
if 'fp' not in upload or upload['fp'] == b'':
continue
filename = upload['filename']
input_file = upload['fp']
ext = get_ext(filename)
d[filename] = self.save_to_file(input_file, ext)
return d
class MemoryTmpStore(dict):
""" Instances of this class implement the
:class:`deform.interfaces.FileUploadTempStore` interface"""
def preview_url(self, uid):
return None
mem_tmp_store = MemoryTmpStore()
class UploadBin(Upload):
"""
Compatibility to previous
"""
pass
class CSVRenderer(object):
def __init__(self, info):
pass
def __call__(self, value, system):
""" Returns a plain CSV-encoded string with content-type
``text/csv``. The content-type may be overridden by
setting ``request.response.content_type``."""
request = system.get('request')
if request is not None:
response = request.response
ct = response.content_type
if ct == response.default_content_type:
response.content_type = 'text/csv'
# error ketika open csv menggunakan bytes
# karena tak mau hapus, maka dibikin try
try:
fout = io.BytesIO() #
fcsv = csv.writer(fout, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
fcsv.writerow(value.get('header', []))
fcsv.writerows(value.get('rows', []))
except:
fout = io.StringIO()
fcsv = csv.writer(fout, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
fcsv.writerow(value.get('header', []))
fcsv.writerows(value.get('rows', []))
return fout.getvalue()
# # Data Table Function
# def _DTstrftime(chain):
# ret = chain and datetime.strftime(chain, '%d-%m-%Y')
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTnumber_format(chain):
# import locale
# locale.setlocale(locale.LC_ALL, 'id_ID.utf8')
# ret = locale.format("%d", chain, grouping=True)
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTupper(chain):
# ret = chain.upper()
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTstatus(chain):
# if chain:
# ret = 'Aktif'
# else:
# ret = 'Mati'
# if ret:
# return ret
# else:
# return chain
##########
# String #
##########
def clean(s):
r = ''
for ch in s:
asc = ord(ch)
if asc > 126 or asc < 32:
ch = ' '
r += ch
return r
# def to_str(s):
# s = s or ''
# s = str(s)
# return clean(s)
def left(s, width):
s = to_str(s)
return s.ljust(width)[:width]
def right(s, width):
s = to_str(s)
return s.zfill(width)[:width]
##################
# Data Structure #
##################
class FixLength(object):
def __init__(self, struct):
self.struct = self.fields = None
self.set_struct(struct)
def set_struct(self, struct):
self.struct = struct
self.fields = {}
for s in struct:
name = s[0]
size = s[1:] and s[1] or 1
typ = s[2:] and s[2] or 'A' # N: numeric, A: alphanumeric
self.fields[name] = {'value': None, 'type': typ, 'size': size}
def set(self, name, value):
self.fields[name]['value'] = value
def get(self, name):
v = self.fields[name]['value']
if v:
return v.strip()
return ''
def __setitem__(self, name, value):
self.set(name, value)
def __getitem__(self, name):
return self.get(name)
def get_raw(self):
return self.get_spliter("")
def get_raw_dotted(self):
return self.get_dotted()
def get_value(self, name, typ):
v = self.fields[name]['value']
if v and typ == 'N':
i = int(v)
if v == i:
v = i
return v
def get_spliter(self, ch):
s = ''
for name, size, typ in self.struct:
v = self.get_value(name, typ)
pad_func = typ == 'N' and right or left
s += pad_func(v, size)
s += ch
return s[:-1]
def get_dotted(self):
return self.get_spliter('.')
def set_raw(self, raw):
awal = 0
for t in self.struct:
name = t[0]
size = t[1:] and t[1] or 1
akhir = awal + size
value = raw[awal:akhir]
if not value:
return
self.set(name, value)
awal += size
return True
def from_dict(self, d):
for name in self.fields:
if name in d and d[name]:
value = d[name]
self.set(name, value)
def to_dict(self):
r = {}
for name in self.fields:
r[name] = self.get(name)
return r
def length(self):
return len(self.get_raw())
import decimal
class Encoder(JSONEncoder):
"""Extends JSONEncoder for date and decimal types."""
def push_date(self, d):
"""Serialize the given datetime.date object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
return "%04d-%02d-%02d" % (d.year, d.month, d.day)
def push_timedelta(self, t):
"""Serialize the given datetime.timedelta object to a JSON string."""
days = t.days
if days < 0:
minus = "-"
days = -days - 1
seconds = 24 * 60 * 60 - t.seconds
else:
minus = ""
seconds = t.seconds
secs = seconds % 60
seconds /= 60
mins = seconds % 60
hours = seconds / 60
return "%s%d:%02d:%02d:%02d" % (minus, days, hours, mins, secs)
def push_time(self, t):
"""Serialize the given datetime.time object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
return "%02d:%02d:%02d" % (t.hour, t.minute, t.second)
def push_datetime(self, dt):
"""Serialize the given datetime.datetime object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
# Don't use strftime because that can't handle dates before 1900.
return ("%04d-%02d-%02dT%02d:%02d:%02d" %
(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second))
def default(self, o):
# We MUST check for a datetime.datetime instance before datetime.date.
# datetime.datetime is a subclass of datetime.date, and therefore
# instances of it are also instances of datetime.date.
if isinstance(o, datetime.datetime):
return self.push_datetime(o)
elif isinstance(o, datetime.date):
return self.push_date(o)
elif isinstance(o, datetime.timedelta):
return self.push_timedelta(o)
elif isinstance(o, datetime.time):
return self.push_time(o)
elif isinstance(o, decimal.Decimal):
return str(o)
else:
return JSONEncoder.default(self, o)
STATUS = (
(1, 'Aktif'),
(0, 'Pasif')
)
INVOICE_STATUS = (
(0, 'Draft'),
(1, 'Approved'),
)
def pbb_biller_split():
settings = get_settings()
if not 'pbb_biller' in settings:
return None, None
biller = re.sub('\D', '', settings['pbb_biller'])
return biller[:2], biller[2:]
def clear_null_value(values):
# digunakan untuk menghapus dictionary yang value nya null
tobe_del = []
for value in values:
if not values[value]:
tobe_del.append(value)
for value in tobe_del:
del values[value]
return values
def replace_char(text, start, stop, char="X"):
x = char * (stop - start + 1)
s = text[:start] + x + text[stop:]
# print "*** DEBUG ", text, s
return s
##################
# Email #
##################
# import smtplib
# from email.MIMEMultipart import MIMEMultipart
# from email.MIMEText import MIMEText
# from email.MIMEBase import MIMEBase
# import base64
SCOPES = [
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/userinfo.profile',
# Add other requested scopes.
]
# def send_email(to_addr_list, subject, message, cc_addr_list=None,from_addr=None,
# login=None, password=None, file=None,
# smtp_server='smtp.gmail.com:587'):
# settings = get_settings()
# if settings['devel']=='true':
# return
#
# if not login:
#
# login = 'center.email' in settings and settings['center.email'] or None
# # if login[-9:]=="gmail.com":
# # return send_gmsg(to=to_addr_list, subject=subject, message_text=message, file=file,)
#
# if not password:
# password = 'center.email_password' in settings and settings['center.email_password'] or None
# if not login or not password:
# return 'Error smtp login or Password'
#
# if 'center.smtp_server' in settings and settings['center.smtp_server']:
# smtp_server = settings['center.smtp_server']
#
# if not from_addr:
# from_addr = login
#
# header = 'From: %s\r\n' % from_addr
# header += 'To: %s \r\n' % ','.join(to_addr_list)
# if cc_addr_list:
# header += 'Cc: %s \r\n' % ','.join(cc_addr_list)
# header += 'Subject: %s \r\n' % subject
# message = header + message
# smtp, port = smtp_server.split(':')
# port = int(port)
#
# #print '>>>>>>>>>>>>', smtp, port, login,password
# try:
# #print smtp_server, smtp, port
# server = smtplib.SMTP(smtp,port) #smtp_server)
# server.ehlo()
# if port==587:
# server.starttls()
#
# server.login(login,password)
# problems = server.sendmail(from_addr, to_addr_list, message)
# server.quit()
# return
# except Exception as e:
# return str(e)
#
# def create_message(sender, to, subject, message_text, file=None):
# """Create a message for an email.
#
# Args:
# sender: Email address of the sender.
# to: Email address of the receiver.
# subject: The subject of the email message.
# message_text: The text of the email message.
# file: The path to the file to be attached.
#
# Returns:
# An object containing a base64url encoded email object.
# """
# if file:
# message = MIMEMultipart()
# msg = MIMEText(message_text)
# else:
# message = MIMEText(message_text)
#
# message['to'] = to
# message['from'] = sender
# message['subject'] = subject
#
# if file:
# message.attach(msg)
# content_type, encoding = mimetypes.guess_type(file)
#
# if content_type is None or encoding is not None:
# content_type = 'application/octet-stream'
# main_type, sub_type = content_type.split('/', 1)
# if main_type == 'text':
# fp = open(file, 'rb')
# msg = MIMEText(fp.read(), _subtype=sub_type)
# fp.close()
# elif main_type == 'image':
# fp = open(file, 'rb')
# msg = MIMEImage(fp.read(), _subtype=sub_type)
# fp.close()
# elif main_type == 'audio':
# fp = open(file, 'rb')
# msg = MIMEAudio(fp.read(), _subtype=sub_type)
# fp.close()
# else:
# fp = open(file, 'rb')
# msg = MIMEBase(main_type, sub_type)
# msg.set_payload(fp.read())
# fp.close()
# filename = os.path.basename(file)
# msg.add_header('Content-Disposition', 'attachment', filename=filename)
# message.attach(msg)
#
# return {'raw': base64.urlsafe_b64encode(message.as_string())}
#
#
#
# def get_service():
# from googleapiclient.discovery import build
# from httplib2 import Http
# from oauth2client import file, client, tools
# """Shows basic usage of the Gmail API.
# Lists the user's Gmail labels.
# """
# # The file token.json stores the user's access and refresh tokens, and is
# # created automatically when the authorization flow completes for the first
# # time.
# store = file.Storage('token.json')
# creds = store.get()
# if not creds or creds.invalid:
# flow = client.flow_from_clientsecrets('./credentials.json', SCOPES)
# creds = tools.run_flow(flow, store)
# return build('gmail', 'v1', http=creds.authorize(Http()))
#
# def send_gmsg(to, subject, message_text, file=None):
# settings = get_settings()
# sender = 'center.email' in settings and settings['center.email'] or None
#
# message = create_message(sender, to, subject, message_text, file=None)
# service = get_service()
# try:
# message = (service.users().messages().send(userId=user_id, body=message)
# .execute())
# return message
# except errors.HttpError as error:
# pass
#
# http://python.web.id/angka-terbilang-pada-python/#.VQpS8s2sXQo
satuan = ['', 'satu', 'dua', 'tiga', 'empat', 'lima', 'enam', 'tujuh',
'delapan', 'sembilan', 'sepuluh', 'sebelas']
def terbilang_(n):
n = int(n)
if n >= 1000000000:
hasil = terbilang_(n / 1000000000) + ['milyar'] + terbilang_(n % 100000000)
elif n > 1000000:
hasil = terbilang_(n / 1000000) + ['juta'] + terbilang_(n % 1000000)
elif n >= 2000:
hasil = terbilang_(n / 1000) + ['ribu'] + terbilang_(n % 1000)
elif n >= 1000:
hasil = ['seribu'] + terbilang_(n - 1000)
elif n >= 200:
hasil = terbilang_(n / 100) + ['ratus'] + terbilang_(n % 100)
elif n >= 100:
hasil = ['seratus'] + terbilang_(n - 100)
elif n >= 20:
hasil = terbilang_(n / 10) + ['puluh'] + terbilang_(n % 10)
elif n >= 12:
hasil = terbilang_(n % 10) + ['belas']
else:
hasil = [satuan[n]]
print('hasil')
return hasil
def terbilang(n):
if n == 0:
return 'nol'
t = terbilang_(n)
while '' in t:
t.remove('')
return ' '.join(t)
def number_only(value):
return re.sub('\D', "", value)
def set_user_log(message, request, logobj=None, user_name=None):
if not logobj:
logobj = log
if not request:
logobj.info("Request not Set")
return
if not user_name:
user_name = request and request.user and request.user.user_name or None
if not user_name:
logobj.info("User Name not Set")
return
addr = request.client_addr
message = "User {} at Addr {}: {}".format(user_name, addr, message)
logobj.warning(message)
import base64
import hashlib
import hmac
import sys
from datetime import datetime
from pyramid_rpc.jsonrpc import JsonRpcError
LIMIT = 1000
CODE_OK = 0
MSG_OK = 'Data Submitted'
import logging
log = logging.getLogger(__name__)
class JsonRpcOtherError(JsonRpcError):
code = -32604
message = 'Other Error'
# error terkait database
class JsonRpcNotImplementedError(JsonRpcError):
code = -40000
message = "Method Not Implemented"
class JsonRpcDbUrlNotFoundError(JsonRpcError):
code = -40001
message = "DB URL Connection Not Found"
class JsonRpcDbConnectionError(JsonRpcError):
code = -40002
message = "DB Connection Not Found"
class JsonRpcDataNotFoundError(JsonRpcError):
code = -40003
message = "Error Data Not Found"
class JsonRpcDataFoundError(JsonRpcError):
code = -40004
message = "Error Data Found"
# error terkait authentication dan authorization
class JsonRpcInvalidLoginError(JsonRpcError):
code = -41001
message = "Invalid RPC User/Password"
class JsonRpcInvalidNikError(JsonRpcError):
code = -41002
message = 'NIK Tidak Valid'
class JsonRpcNikFoundError(JsonRpcError):
code = -41003
message = 'NIK Sudah Ada'
class JsonRpcMobileFoundError(JsonRpcError):
code = -41004
message = 'No HP Sudah Digunakan'
class JsonRpcInvalidMobileError(JsonRpcError):
code = -41004
message = 'No HP Tidak Valid'
class JsonRpcInvalidDataError(JsonRpcError):
code = -41005
message = 'Data Tidak Valid'
class JsonRpcEmailFoundError(JsonRpcError):
code = -41006
message = 'e-mail Sudah Digunakan'
class JsonRpcInvalidEmailError(JsonRpcError):
code = -41007
message = 'e-mail Tidak Valid'
class JsonRpcMailError(JsonRpcError):
code = -41008
message = 'Gagal autentikasi mail server'
class JsonRpcUserFoundError(JsonRpcError):
code = -41009
message = 'User sudah digunakan'
class JsonRpcRegisterFailError(JsonRpcError):
code = -41010
message = 'Gagal Registrasi User'
class JsonRpcProfileFailError(JsonRpcError):
code = -41011
message = 'Gagal Update Profile'
class JsonRpcGetPasswordError(JsonRpcError):
code = -41012
message = 'Gagal Request Password'
class JsonRpcUserNotFoundError(JsonRpcError):
code = -41013
message = 'User Tidak Ada'
class JsonRpcInvalidTimeError(JsonRpcError):
code = -41014
message = 'Periksa Date Time Server'
class JsonRpcPermissionError(JsonRpcError):
code = -41015
message = 'Hak Akses dibatasi'
# error terkait transaksi
class JsonRpcInvalidInvoiceError(JsonRpcError):
code = -50001
message = 'Invalid No Bayar'
class JsonRpcBankNotFoundError(JsonRpcError):
code = -51001
message = 'Bank Not Found'
class JsonRpcBillNotFoundError(JsonRpcError):
code = -52001
message = 'Bill Not Found'
# class JsonRpcDataNotFoundError(JsonRpcError):
# code = -52001
# message = 'Data Not Found'
class JsonRpcBillAllreadyPaidError(JsonRpcError):
code = -52002
message = 'Bill Allready Paid'
class JsonRpcBillDifferentError(JsonRpcError):
code = -52003
message = 'Bill Amount Different'
class JsonRpcNtbNotFoundError(JsonRpcError):
code = -53001
message = 'NTB Not Found'
class JsonRpcNtbNotValidError(JsonRpcError):
code = -53002
message = 'NTB Not Valid'
# Reversal Message
class JsonRpcPaymentNotFoundError(JsonRpcError):
code = -54001
message = 'Payment Not Found'
class JsonRpcPaymentNotOwnerError(JsonRpcError):
code = -54002
message = 'Not Your Payment'
class JsonRpcPaymentOpenError(JsonRpcError):
code = -54003
message = 'Payment Open'
class JsonRpcReversedError(JsonRpcError):
code = -54004
message = 'Payment Reversed'
class JsonRpcReversalError(JsonRpcError):
code = -54005
message = 'Cannot Reversed'
class JsonRpcPaymentFoundError(JsonRpcError):
code = -54006
message = 'Payment Found with same invoice and ntb'
# Biller Status
class JsonRpcBillerNotFoundError(JsonRpcError):
code = -55001
message = 'Biller Not Found'
class JsonRpcBillerNetworkError(JsonRpcError):
code = -55002
message = 'Biller Network unrecognized'
class JsonRpcBillerError(JsonRpcError):
code = -55000
message = 'Biller Other Error'
class JsonRpcProdukNotFoundError(JsonRpcError):
code = -56001
message = 'Produk Not Found'
# Biller Status
class JsonRpcCaNotRegisteredError(JsonRpcError):
code = -56001
message = 'CA Not Registered'
def custom_error(code, message):
cls = JsonRpcError()
cls.code = code
cls.message = message
return cls
begin_unix_time = datetime(1970, 1, 1)
def get_seconds(current_time=None):
"""
Digunakan untuk menghitung jumlah detik sampai dengan waktu sekarang
:param current_time: optional waktu yang akan dihitung
:return:
"""
if not current_time:
current_time = datetime.utcnow()
durasi = current_time - begin_unix_time
return int(durasi.total_seconds())
def json_rpc_header(userid, password, key=None):
"""
Digunakan untuk membuat header autentikasi saat akan mengirim request
:param userid: string,
:param password: string,
:param key: optional string berisi kode enkripsi
:return: {
userid string,
signature=string,
key=string
}
"""
if not key:
key = str(get_seconds())
value = '&'.join([str(userid), str(key)])
password = str(password)
signature = hmac.new(key=str.encode(password), msg=str.encode(value),
digestmod=hashlib.sha256).digest()
if sys.version < '3':
encoded_signature = base64.encodestring(signature).replace('\n', '')
else:
encoded_signature = base64.encodebytes(signature).decode().replace('\n', '')
return dict(userid=userid, signature=encoded_signature, key=key)
##############################
# Web Service Authentication #
##############################
lima_menit = 300
from deform.form import Button
btn_add = Button('add', title='Tambah', css_class="btn-success")
btn_view = Button('view', title='Lihat', css_class="btn-info")
btn_edit = Button('edit', title='Edit', css_class="btn-success")
btn_delete = Button('delete', title='Hapus', css_class="btn-danger")
btn_save = Button('save', title='Simpan', css_class="btn-primary")
btn_force = Button('force', title='Paksa', css_class="btn-info")
btn_upload = Button('upload', title='Upload', css_class="btn-info")
btn_cancel = Button('cancel', title='Batal', css_class="btn-warning")
btn_reset = Button('reset', title='Reset', css_class="btn-warning", type="reset")
btn_close = Button('close', title='Tutup', css_class="btn-danger")
btn_print = Button('print', title='Print', css_class="btn-info", type="button")
btn_pdf = Button('pdf', title='PDF', css_class="btn-info", type="button")
btn_csv = Button('csv', title='CSV', css_class="btn-info", type="button")
btn_txt = Button('txt', title='TXT', css_class="btn-info", type="button")
btn_label = Button('label', title='Print Label', css_class="btn-info", type="button")
btn_print_ttr = Button('ttr', title='Tanda Terima', css_class="btn-info", type="button")
btn_barcode = Button('barcode', title='Barcode', css_class="btn-success", type="button")
btn_qrcode = Button('qrcode', title='QRcode', css_class="btn-success", type="button")
btn_barcode_nop = Button('barcode_nop', title='Barcode NOP', css_class="btn-info", type="button")
btn_inquiry = Button('inquiry', title='Inquiry', css_class="btn-info", type="submit")
btn_payment = Button('payment', title='Payment', css_class="btn-success", type="submit")
btn_accept = Button('accept', title='Terima', css_class="btn-success")
btn_reject = Button('reject', title='Tolak', css_class="btn-danger")
btn_prev = Button('prev', title='Kembali', css_class="btn-success")
btn_next = Button('next', title='Lanjut', css_class="btn-success")
btn_send = Button('send', title='Kirim', css_class="btn-success")
btn_search = Button('search', title='Cari', css_class="btn-success")
btn_proses = Button('proses', title='Proses', css_class="btn-info", type="button")
inquiry_button = [btn_inquiry, btn_cancel]
payment_button = [btn_payment, btn_cancel]
print_button = [btn_print, btn_cancel]
proses_button = [btn_proses, btn_cancel]
save_buttons = [btn_save, btn_cancel]
update_buttons = [btn_add, btn_edit, btn_delete, btn_close]
flow_buttons = [btn_prev, btn_accept, btn_reject, btn_next]
delete_buttons = [btn_delete, btn_cancel]
flow_1_buttons = [btn_next, btn_cancel]
force_buttons = [btn_force, btn_cancel]
flow_2_buttons = [btn_prev, btn_next, btn_cancel]
pdf_txt_buttons = [btn_pdf, btn_txt, btn_close]
pdf_buttons = [btn_pdf, btn_close]
import random
import string
from wheezy.captcha.image import captcha, background, noise, smooth, text, offset
from ..tools import *
def img_captcha(request, length=5, chars=string.ascii_uppercase + string.digits):
kode_captcha = randomcaptcha(length, chars)
_here = os.path.dirname(__file__)
# captcha_path = os.path.abspath('/static/img/captcha.png'))
# font_path = os.path.abspath(os.path.join('/static/v3/fonts/arial.ttf'))
settings = get_settings()
captcha_files = settings['captcha_files']
from pyramid.path import AssetResolver
a = AssetResolver('opensipkd.base')
static_path = a.resolve('static').abspath()
captcha_path = os.path.abspath(os.path.join(captcha_files, kode_captcha + '.png'))
font_path = os.path.abspath(os.path.join(static_path, 'v3/fonts/arial.ttf'))
captcha_image = captcha(drawings=[
background(color='#fff'),
text(fonts=[font_path],
drawings=[
offset(0.8)
]),
noise(),
smooth()
], width=300)
image = captcha_image(' ' + kode_captcha + ' ')
image.save(captcha_path, 'PNG', quality=75)
kode_captcha = kode_captcha
request.session['captcha'] = kode_captcha
return kode_captcha
def randomcaptcha(length, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for i in range(length))
def get_captcha(request):
kode_captcha = img_captcha(request)
request.session['captcha'] = kode_captcha
return kode_captcha
\ No newline at end of file
# Tools PBB Constanta yang digunakan di PBB karena dalam PBB primary Key
# Banyak terdiri dari beberapa field
# from types import (
# StringType,
# UnicodeType)
from ..tools import FixLength, get_settings
# from models import pbbDBSession
# from models.ref import TempatPembayaran
# from models.pegawai import DatLogin
from datetime import timedelta, datetime
from math import ceil
import re
from . import left, right
PROPINSI = [('kd_propinsi', 2, 'N'), ]
DATI2 = list(PROPINSI)
DATI2.append(('kd_dati2', 2, 'N'))
KECAMATAN = list(DATI2)
KECAMATAN.append(('kd_kecamatan', 3, 'N'))
KELURAHAN = list(KECAMATAN)
KELURAHAN.append(('kd_kelurahan', 3, 'N'))
BLOK = list(KELURAHAN)
BLOK.append(('kd_blok', 3, 'N'))
NOP = list(BLOK)
NOP.extend([('no_urut', 4, 'N'),
('kd_jns_op', 1, 'N')])
SPPT = list(NOP)
SPPT.append(('thn_pajak_sppt', 4, 'N'))
KANTOR = [('kd_kanwil', 2, 'N'),
('kd_kantor', 2, 'N')]
BANK = list(KANTOR)
BANK.append(('kd_tp', 2, 'N'))
BANK_SISMIOP = [
('kd_kanwil_bank', 2, 'N'),
('kd_kppbb_bank', 2, 'N'),
('kd_bank_tunggal', 2, 'N'),
('kd_bank_persepsi', 2, 'N'),
('kd_tp', 2, 'N'),
]
BAYAR = list(SPPT)
BAYAR.append(('pembayaran_sppt_ke', 3, 'N'))
BAYAR.extend(BANK)
SIKLUS = list(SPPT)
SIKLUS.append(('siklus_sppt', 3, 'N'))
NOPEL = list(KANTOR)
NOPEL.extend([('tahun', 4, 'N'),
('bundel', 4, 'N'),
('urut', 3, 'N')])
NOPELDET = list(NOPEL)
NOPELDET.extend(list(NOP))
AGENDA = [('thn_agenda_kirim', 4, 'N'),
('kd_seksi', 2, 'N'),
('no_agenda_kirim', 4, 'N')]
BERKAS = list(NOPELDET)
BERKAS.extend(list(AGENDA))
def get_value(obj, struct):
raw = ""
for name, typ, size in struct:
raw += obj[name]
return raw
class SetFixLength(FixLength):
def __init__(self, raw, struct):
super(SetFixLength, self).__init__(struct)
raw = re.sub("\D", "", raw)
self.set_raw(raw)
for name, typ, size in struct:
setattr(self, name, self[name])
class BaseFixLength(SetFixLength):
def __init__(self, raw):
super(BaseFixLength, self).__init__(raw, self.get_structure())
raw = re.sub("\D", "", raw)
self.set_raw(raw)
def get_structure(self):
pass
class FixDati2(BaseFixLength):
def get_structure(self):
return DATI2
class FixKecamatan(BaseFixLength):
def get_structure(self):
return KECAMATAN
class FixKelurahan(BaseFixLength):
def get_structure(self):
return KELURAHAN
class FixBlok(BaseFixLength):
def get_structure(self):
return BLOK
class FixKantor(BaseFixLength):
def get_structure(self):
return KANTOR
class FixBank(BaseFixLength):
def get_structure(self):
return BANK
class FixBankSismiop(BaseFixLength):
def get_structure(self):
return BANK_SISMIOP
class FixNop(BaseFixLength):
def get_structure(self):
return NOP
def get_formatted(self):
s = ''
for name, size, typ in self.struct:
v = self.get_value(name, typ)
pad_func = typ == 'N' and right or left
s += pad_func(v, size)
if len(s) in [5, 11, 19]:
s += "-"
else:
s += "."
return s[:-1]
class FixSppt(BaseFixLength):
def get_structure(self):
return SPPT
class FixSiklus(BaseFixLength):
def get_structure(self):
return SIKLUS
class FixBayar(BaseFixLength):
def get_structure(self):
return BAYAR
def get_sppt(self):
return get_value(self, SPPT)
def get_bank(self):
return get_value(self, BANK)
class FixNopel(BaseFixLength):
def get_structure(self):
return NOPEL
def get_kantor(self):
return get_value(self, KANTOR)
class FixNopelDetail(BaseFixLength):
def get_structure(self):
return NOPELDET
def get_nopel(self):
return get_value(self, NOPEL)
def get_nop(self):
return get_value(self, NOP)
class FixAgenda(BaseFixLength):
def get_structure(self):
return AGENDA
class FixBerkas(BaseFixLength):
def get_structure(self):
return BERKAS
def get_nopel(self):
return get_value(self, NOPEL)
def get_nop(self):
return get_value(self, NOP)
def get_nopel_det(self):
return get_value(self, NOPELDET)
def ensure_datetime(d):
"""
Takes a date or a datetime as input, outputs a datetime
"""
if isinstance(d, datetime):
return d
return datetime(d.year, d.month, d.day)
def hitung_denda(piutang_pokok, jatuh_tempo, tanggal=None, max_denda=24):
persen_denda = 2
# jatuh_tempo = jatuh_tempo.date()
if not tanggal:
tanggal = datetime.now().date()
if ensure_datetime(tanggal) < ensure_datetime(jatuh_tempo): # + timedelta(days=1):
return 0
kini = datetime.now()
x = (kini.year - jatuh_tempo.year) * 12
y = kini.month - jatuh_tempo.month
bln_tunggakan = x + y + 1
if kini.day <= jatuh_tempo.day:
bln_tunggakan -= 1
if bln_tunggakan < 1:
bln_tunggakan = 0
if bln_tunggakan > max_denda:
bln_tunggakan = max_denda
return int(ceil(bln_tunggakan * persen_denda / 100.0 * piutang_pokok))
def nop_formatted(row):
if type(row) == str:
row = FixNop(row)
return "%s.%s-%s.%s-%s.%s.%s" % (row.kd_propinsi, row.kd_dati2, row.kd_kecamatan,
row.kd_kelurahan, row.kd_blok, row.no_urut, row.kd_jns_op)
def nop_to_id(row):
if type(row) in (StringType, UnicodeType):
row = FixNop(row)
return "%s%s%s%s%s%s%s" % (row.kd_propinsi, row.kd_dati2, row.kd_kecamatan,
row.kd_kelurahan, row.kd_blok, row.no_urut, row.kd_jns_op)
# JEnis REstitusi Kompensasi
JNS_RESKOM = (
(0, 'Pilih Jenis'),
(1, 'Restitusi'),
(2, 'Kompensasi'),
(3, 'Disumbangkan'),
(4, 'Koreksi'),)
JENIS_ID = (
(1, 'Tagihan'),
(2, 'Piutang'),
(3, 'Ketetapan'))
SUMBER_ID = (
(4, 'Manual'),
(1, 'PBB'),
)
# def daftar_tp():
# return pbbDBSession.query(TempatPembayaran.kd_tp,TempatPembayaran.nm_tp).\
# order_by(TempatPembayaran.kd_tp).all()
STATUS_KOLEKTIF = (
(0, 'Individu'),
(1, 'Kolektif'),
)
JENIS_PENGURANGAN = (
(0, '(None)'),
(1, 'Pengurangan Permanen'),
(2, 'Pengurangan PST'),
(3, 'Pengurangan Pengenaan JPB'),
(5, 'Pengurangan Sebelum SPPT Terbit')
)
JENIS_SALINAN = (
('0', 'SPPT'),
('1', 'SKP Terhadap SPOP'),
('2', 'SKP Kurang Bayar')
)
def get_date_triwulan(tahun, tri):
tglawal = tahun
tglakhir = tahun
simbol = ""
if tri == "4":
tglawal += "-10-01"
tglakhir += "-12-31"
simbol = "IV"
elif tri == "2":
tglawal += "-04-01"
tglakhir += "-06-30"
simbol = "II"
elif tri == "3":
tglawal += "-07-01"
tglakhir += "-09-30"
simbol = "III"
else:
tglawal += "-01-01"
tglakhir += "-03-31"
simbol = "I"
return [tglawal, tglakhir, simbol]
BUKUS = {
'11': ("Buku 1", 1, 100000),
'12': ("Buku 1,2", 1, 500000),
'13': ("Buku 1,2,3", 1, 2000000),
'14': ("Buku 1,2,3,4", 1, 5000000),
'15': ("Buku 1,2,3,4,5", 1, 999999999),
'22': ("Buku 2", 100001, 500000),
'23': ("Buku 2,3", 100001, 2000000),
'24': ("Buku 2,3,4", 100001, 5000000),
'25': ("Buku 2,3,4,5", 100001, 999999999),
'33': ("Buku 3", 500001, 2000000),
'34': ("Buku 3,4", 500001, 5000000),
'35': ("Buku 3,4,5", 500001, 999999999),
'44': ("Buku 4", 2000001, 5000000),
'45': ("Buku 4,5", 2000001, 999999999),
'55': ("Buku 5", 5000001, 999999999),
}
def bukus_sorted():
from operator import itemgetter
return sorted(BUKUS.items(), key=itemgetter(1))
def pbb_biller():
settings = get_settings()
return settings['pbb_biller']
def pbb_kantor():
settings = get_settings()
return settings['pbb_kantor']
def pbb_pemda():
settings = get_settings()
return settings['pbb_pemda']
def filter_nop(qry, nop, tahun=None):
nop = FixNop(nop)
qry = qry.filter_by(
kd_propinsi=nop["kd_propinsi"],
kd_dati2=nop["kd_dati2"],
kd_kecamatan=nop["kd_kecamatan"],
kd_kelurahan=nop["kd_kelurahan"],
kd_blok=nop["kd_blok"],
no_urut=nop["no_urut"],
kd_jns_op=nop["kd_jns_op"], )
if tahun:
qry = qry.filter_by(thn_pajak_sppt=tahun)
return qry
# def filter_nop(qry, param, cls=None):
# nop = FixNop(param)
# if cls:
# return qry.filter(
# cls.kd_propinsi==nop["kd_propinsi"],
# cls.kd_dati2==nop["kd_dati2"],
# cls.kd_kecamatan==nop["kd_kecamatan"],
# cls.kd_kelurahan==nop["kd_kelurahan"],
# cls.kd_blok==nop["kd_blok"],
# cls.no_urut==nop["no_urut"],
# cls.kd_jns_op==nop["kd_jns_op"],)
#
# return qry.filter_by(
# kd_propinsi=nop["kd_propinsi"],
# kd_dati2=nop["kd_dati2"],
# kd_kecamatan=nop["kd_kecamatan"],
# kd_kelurahan=nop["kd_kelurahan"],
# kd_blok=nop["kd_blok"],
# no_urut=nop["no_urut"],
# kd_jns_op=nop["kd_jns_op"],
# )
def pbb_nip(user_name):
if user_name == 'admin' or user_name == 'pbb-admin':
return '060000000000000000'
from opensipkd.pbb.models import PbbDBSession, DatLogin
row = PbbDBSession.query(DatLogin). \
filter_by(nm_login=user_name.upper()).first()
if row:
return row.nip
return
\ No newline at end of file
import os
import sys
import ntpath
import csv
import io
from datetime import datetime
from z3c.rml import rml2pdf
import subprocess
import logging
from ..tools import get_settings
log = logging.getLogger(__name__)
rpt_path = ""
def get_root_path():
_here = os.path.dirname(__file__)
return _here
def get_logo():
path = os.path.join(os.path.dirname(get_root_path()), 'static/img')
# FIXME opensipkd/static to be changed by active app folder
return path + "/logo.png", path + "/line.png"
def waktu():
return datetime.now().strftime('%d-%m-%Y %H:%M')
def open_rml_row(row_tpl_filename):
f = open(rpt_path + row_tpl_filename)
row_rml = f.read()
f.close()
return row_rml
def open_rml_pdf(tpl_filename, **kwargs):
# out_filename = 'out_filename' in kwargs and kwargs['out_filename'] or tpl_filename
pdf_filename = tpl_filename + '.pdf'
f = open(rpt_path + tpl_filename)
rml = f.read()
f.close()
logo, line = get_logo() # path + "/img/logo.png"
params = {}
for key, value in kwargs.items():
params[key] = value
rml = rml.format(waktu=waktu(),
logo=logo,
line=line, **kwargs)
pdf = rml2pdf.parseString(rml)
return pdf, pdf_filename
def openfile_response(request, filepath):
response = request.response
# import magic
# ctype = magic.from_file(filepath, mime=True)
f = open(filepath, 'rb')
filename = ntpath.basename(filepath)
import mimetypes
ctype = mimetypes.MimeTypes().guess_type(filename)[0]
response.content_type = str(ctype)
response.content_disposition = 'filename=' + filename
response.write(f.read())
return response
def file_response(request, f, filename, filetype=None):
"""
:param request:
:param f: object file
:param filename:
:param filetype: type of file
:return: object response
"""
import ntpath
if not f:
f = open(filename, 'rb')
fname = ntpath.basename(filename)
else:
fname = filename
if not filetype:
t = fname.split('.')
filetype = ''.join(t[-1:])
response = request.response
response.content_type = "application/" + filetype
response.content_disposition = 'filename=' + fname
if filetype == "txt":
response.charset = "UTF-8"
response.write(f.read())
return response
def set_response(request, filename):
ext = filename.split('.')[-1]
if ext in ['gif', 'png', 'jpeg', 'jpg']:
ext = 'image/' + ext
elif ext in ['pdf']:
ext = 'application/' + ext
with open(filename, 'rb') as f:
return file_response(request, f, filename, ext)
def pdf_response(request, f=None, filename=None):
return file_response(request, f, filename, 'pdf')
def odt_response(request, f, filename):
return file_response(request, f, filename, 'odt')
def odt_export1(request, filename, file_type):
settings = get_settings()
if 'unoconv_py' in settings and settings['unoconv_py']:
unoconv_py = settings['unoconv_py']
if 'unoconv_bin' in settings and settings['unoconv_bin']:
unoconv_bin = settings['unoconv_bin']
f = '.'.join([filename, 'odt'])
subprocess.call([unoconv_py, unoconv_bin, file_type, f])
out_filename = '.'.join([filename, file_type])
with open(out_filename, 'rb') as f:
return file_response(request, f, out_filename, file_type)
def odt_export_(filename, file_type, password=None):
log.info("Start Export {}".format(filename))
settings = get_settings()
import getpass
username = getpass.getuser()
odt_file = '.'.join([filename, 'odt'])
out_dir = os.path.dirname(filename)
if 'unoconv_py' in settings and settings['unoconv_py']:
log.info("Unoconv PY {}".format(filename))
unoconv_py = settings['unoconv_py']
if 'unoconv_bin' in settings and settings['unoconv_bin']:
unoconv_bin = settings['unoconv_bin']
else:
unoconv_bin = ''
params = [unoconv_py, unoconv_bin, '-f', file_type]
if password:
params.extend(['-e', 'EncryptFile=True',
'-e', 'DocumentOpenPassword=' + password])
params.append(odt_file)
log.info("DEBUG EXPORT>>{}".format(' '.join(params)))
subprocess.call(params)
# convert using bin
else:
log.info("Unoconv BIN {}".format(filename))
if 'unoconv_bin' in settings and settings['unoconv_bin']:
unoconv_bin = settings['unoconv_bin']
params = [unoconv_bin,
'-env:UserInstallation=file:///tmp/' + username,
'--headless', '--convert-to', 'pdf']
params.extend(['--outdir', out_dir, file_type, odt_file])
log.info("DEBUG EXPORT>>{}".format(' '.join(params)))
subprocess.call(params)
out_file = '.'.join([filename, file_type])
if not os.path.isfile(odt_file):
log.info("ODT FILE NOTFOUND")
return dict(error=dict(code=-1,
message='File %s tidak ditemukan ' % odt_file))
else:
if not os.path.isfile(out_file):
return dict(error=dict(code=-1,
message='File %s tidak ditemukan ' % out_file))
os.remove(odt_file)
return dict(filename=out_file)
def odt_export(request, filename, file_type):
results = odt_export_(filename, file_type)
# bug key error: code
# if not results['code']:
if 'error' in results:
return results
# out_filename = '.'.join([filename, file_type])
# out_filename = '.'.join([results['filename'], file_type])
out_filename = results['filename']
if not os.path.isfile(out_filename):
return dict(error=True,
msg='Error %s tidak ditemukan ' % out_filename)
with open(out_filename, 'rb') as f:
return file_response(request, f, out_filename, file_type)
def odt_export2(request, filename, file_type):
settings = get_settings()
if 'unoconv_py' in settings and settings['unoconv_py']:
if 'unoconv_bin' in settings and settings['unoconv_bin']:
unoconv_py = settings['unoconv_py']
unoconv_bin = settings['unoconv_bin']
f = '.'.join([filename, 'odt'])
subprocess.call([unoconv_bin,
'-env:UserInstallation=file:///tmp/libO_udir',
'--headless', '--convert-to', file_type,
'--outdir', '/tmp', f])
out_filename = '.'.join([filename, file_type])
with open(out_filename, 'rb') as f:
return file_response(request, f, out_filename, file_type)
def csv_rows(query):
row = query.first()
header = row.keys()
rows = []
for item in query.all():
rows.append(list(item))
return dict(header=header,
rows=rows)
def csv_response(request, value, filename):
response = request.response
response.content_type = 'text/csv'
# response.content_disposition = 'attachment;filename=' + filename
response.content_disposition = 'filename=' + filename
if sys.version_info < (3,):
import StringIO
fout = StringIO.StringIO()
else:
fout = io.StringIO()
fcsv = csv.writer(fout, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
fcsv.writerow(value.get('header', []))
fcsv.writerows(value.get('rows', []))
response.write(fout.getvalue())
return response
def terbilang(bil):
angka = ["", "Satu", "Dua", "Tiga", "Empat", "Lima", "Enam", "Tujuh", "Delapan", "Sembilan", "Sepuluh", "Sebelas"]
hasil = " "
n = int(bil)
if 0 <= n <= 11:
hasil = hasil + angka[n]
elif n < 20:
hasil = terbilang(n % 10) + " Belas"
elif n < 100:
hasil = terbilang(n / 10) + " Puluh" + terbilang(n % 10)
elif n < 200:
hasil = " Seratus" + terbilang(n - 100)
elif n < 1000:
hasil = terbilang(n / 100) + " Ratus" + terbilang(n % 100)
elif n < 2000:
hasil = " Seribu" + terbilang(n - 1000)
elif n < 1000000:
hasil = terbilang(n / 1000) + " Ribu" + terbilang(n % 1000)
elif n < 1000000000:
hasil = terbilang(n / 1000000) + " Juta" + terbilang(n % 1000000)
else:
hasil = terbilang(n / 1000000000) + " Miliar" + terbilang(n % 1000000000)
return hasil
def ods_export(request, filename, file_type):
settings = get_settings()
if 'unoconv_py' in settings and settings['unoconv_py']:
if 'unoconv_bin' in settings and settings['unoconv_bin']:
unoconv_py = settings['unoconv_py']
unoconv_bin = settings['unoconv_bin']
f = '.'.join([filename, 'ods'])
subprocess.call([unoconv_py, unoconv_bin, '-f', file_type, f])
out_filename = '.'.join([filename, file_type])
with open(out_filename, 'rb') as f:
return file_response(request, f, out_filename, file_type)
class Item(object):
pass
from pyramid.events import (
subscriber,
BeforeRender,
)
from pyramid.i18n import default_locale_negotiator
from pyramid.interfaces import IAuthenticationPolicy, IAuthorizationPolicy
from pyramid.security import Allowed
from pyramid.threadlocal import get_current_registry
LOCALE_NAMES = {
'en': 'English',
'id': 'Indonesia',
}
LOCALE_IDS = list(LOCALE_NAMES.keys())
def get_settings():
return get_current_registry().settings
def get_locale_should_be(request):
if 'HTTP_ACCEPT_LANGUAGE' not in request.environ:
return 'id'
# id-ID,id;q=0.9,en-US;q=0.8,en;q=0.7
vals = request.environ['HTTP_ACCEPT_LANGUAGE'].split(',')
for val in vals:
for locale_id in LOCALE_IDS:
if val.find(locale_id) == 0:
return locale_id
def get_locale_name(request):
return default_locale_negotiator(request) or \
get_locale_should_be(request)
def get_locale_title(name):
if name in LOCALE_NAMES:
return LOCALE_NAMES[name]
return name
def get_locale_title_(request):
name = get_locale_name(request)
return get_locale_title(name)
# def api_has_permission_(request, permission, context=None):
# if context is None:
# context = request.context
# reg = request.registry
# authn_policy = reg.queryUtility(IAuthenticationPolicy)
# if authn_policy is None:
# return Allowed('No authentication policy in use.')
# authz_policy = reg.queryUtility(IAuthorizationPolicy)
# if authz_policy is None:
# raise ValueError(
# 'Authentication policy registered without '
# 'authorization policy'
# ) # should never happen
# principals = authn_policy.effective_principals(request)
# return authz_policy.permits(context, principals, permission)
@subscriber(BeforeRender)
def add_global(event):
event['locale_name'] = get_locale_title_
event['locale_should_be'] = get_locale_should_be
# event['api_has_permission'] = api_has_permission_
from .validator import FormValidator
\ No newline at end of file
class BaseValidation(object):
def __init__(self, cldr, params=None):
if params is None:
params = {}
self.colander_invalid = cldr
self.inputname = ('inputname' in params) and params['inputname'] or ''
self.title = self.inputname.replace('_', ' ').title().replace(' ', '')
self.value = None
if 'inputvalue' in params:
self.value = params['inputvalue']
self.required = ('required' in params and params['required'] == True)
self.rulemessage = ('message' in params) and params['message'] or None
self.rulevalue = None
if 'rule_value' in params:
self.rulevalue = params['rule_value']
self.allvalues = ('all_values' in params) and params['all_values'] or {}
def value_type(self):
if type(self.value) is int:
return 'integer'
elif type(self.value) is float:
return float
return 'string'
from .base_validation import BaseValidation
class ValidationConfirmed(BaseValidation):
def __init__(self, cldr, params = {}):
super(ValidationConfirmed, self).__init__(cldr = cldr, params = params)
self.__confirmname = 'confirmation_' + self.inputname
self.__confirmvalue = (self.__confirmname in self.allvalues) and self.allvalues[self.__confirmname] or None
self.__message = self.rulemessage or 'Konfirmasi :attribute tidak sama.'
def validate(self):
ok = (str(self.value) == str(self.__confirmvalue))
if not ok:
self.colander_invalid[self.inputname] = self.__message.replace(':attribute', self.title)
return ok
"""
eg:
def form_validator(form, value):
cldr = colander.Invalid(form, 'Form input harus diisi')
validations = {
'password': {
'rules' : confirmed', # artinya field ini membutuhkan field 'confirmation_password' dan isinya harus sama. persis seperti register
'messages': {
'confirmed' : 'Password tidak sama dengan Konfirmasi Password',
}
},
'confirmation_password': {
'rules' : 'required',
'messages': {
'required' : 'Konfirmasi Password harus diisi',
}
}
}
valid = FormValidator(
cldr = cldr,
inputvalues = value,
validations = validations
)
valid.validate()
"""
\ No newline at end of file
from datetime import (
datetime,
date
)
from .base_validation import BaseValidation
class ValidationDateTime(BaseValidation):
def __init__(self, cldr, params = {}):
super(ValidationDateTime, self).__init__(cldr = cldr, params = params)
self.__message = self.rulemessage or 'Format :attribute bukan tidak benar. Format :format'
def __get_datevalue(self):
try:
dt = datetime.strptime(str(self.value), str(self.rulevalue))
except Exception as e:
dt = None
return dt
def validate(self):
ok = True
if isinstance(self.value, datetime) or isinstance(self.value, date):
return ok
if (self.value is None) or (self.value == ''):
ok = not self.required
else:
dt = self.__get_datevalue()
ok = (dt is not None)
if not ok:
self.colander_invalid[self.inputname] = self.__message.replace(':format', str(self.rulevalue).replace('%', '')).replace(':attribute', self.title)
return ok
"""
eg:
def form_validator(form, value):
cldr = colander.Invalid(form, 'Form input harus diisi')
validations = {
'tgl_lahir': {
'rules': 'date:%d-%m-%Y',
'messages': {
'date' : 'Format Tanggal Lahir salah. Gunakan format dd-mm-yyyy',
}
}
}
valid = FormValidator(
cldr = cldr,
inputvalues = value,
validations = validations
)
valid.validate()
"""
\ No newline at end of file
from sqlalchemy.orm.attributes import InstrumentedAttribute
from .base_validation import BaseValidation
class ValidationDB(object):
__invalid_key = 'DB VALIDATION ERROR'
def __init__(self, inputname, value, params):
if not isinstance(params, dict):
self.__invalid_rule_value()
self.__inputname = inputname
self.__value = value
self.__params = params
# get value berdasarkan key dari params
def __get_key(self, key):
return key in self.__params and self.__params[key] or None
# raise error dgn argument msg
def __raise_invalid(self, msg):
raise Exception(self.__invalid_key, msg)
# raise error karena rule value salah format
def __invalid_rule_value(self):
self.__raise_invalid('Rule harus berupa "dictionary object".')
# raise error karena tidak ada key connection (database session) dalam params
def __invalid_db_session(self):
self.__raise_invalid('DB Session dibutuhkan dalam validasi.')
# raise error karena tidak ada key table dalam params
def __invalid_table(self):
self.__raise_invalid('Nama Table dibutuhkan dalam validasi.')
# raise error karena field tidak ada dalam table
def __invalid_field(self):
self.__raise_invalid('Field tidak ditemukan.')
# raise error karena format condition harus dict object
def __invalid_condition(self):
self.__raise_invalid('Condition value harus berupa "list". Contoh: [table.id == id, table.kode == kode]')
def __invalid_method(self):
self.__raise_invalid('Query method tidak dapat diproses.')
# get dbSession / connection
def __db_session(self):
val = self.__get_key('session')
if not val:
self.__invalid_db_session()
return val
# get tablename
def __db_table(self):
val = self.__get_key('table')
if not val:
self.__invalid_table()
return val
def __fix_field(self, field):
table = self.__db_table()
if not isinstance(field, InstrumentedAttribute):
if isinstance(field, str):
field = getattr(table, field, None)
else:
field = None
if not field:
self.__invalid_field()
# check existensi field di table
# ambil nama field stringnya
name = field.name
if (name == '(no name)') or (not name) or (not getattr(table, name, None)):
self.__invalid_field()
return field
# get field yang akan dicek uniknya
def __field(self):
field = self.__get_key('field') or self.__inputname
return self.__fix_field(field)
# get condition / filter dari table
def __conditions(self):
return self.__get_key('conditions')
# query berdasarkan field yang akan dicek uniknya
def query(self):
session = self.__db_session()
table = self.__db_table()
# field = self.__field()
# return session.query(table).filter(field == self.__value)
return session.query(table)
# query setelah diambil query dari field yang harus unik berdasarkan kondisi keunikanya
def query_condition(self, query=None):
method = self.__get_key('method')
if method != 'query':
if not query:
query = self.query()
cond = self.__conditions()
if cond:
if not isinstance(cond, list):
self.__invalid_condition()
# query = apply_filters(query, cond)
query = query.filter(*cond)
else:
field = self.__field()
query = query.filter(field == self.__value)
else:
query = self.__get_key('query')
if not query:
self.__invalid_method()
return query
# validasi untuk fields yg unique
class ValidationUniqueDB(BaseValidation, ValidationDB):
__invalid_key = 'VALIDATION "UNIQUE_DB" ERROR'
def __init__(self, cldr, params=None):
BaseValidation.__init__(self, cldr=cldr, params=params)
ValidationDB.__init__(self, inputname=self.inputname, value=self.value, params=self.rulevalue)
self.__message = self.rulemessage or ':attribute sudah digunakan.'
# return True jika data tidak ada. raise form (False) jika sudah ada
def validate(self):
query = self.query_condition()
data = query.first()
ok = (data is None)
if not ok:
self.colander_invalid[self.inputname] = self.__message.replace(':attribute', self.title)
return ok
# validasi untuk existensi data
class ValidationExistsDB(BaseValidation, ValidationDB):
__invalid_key = 'VALIDATION "EXIST_DB" ERROR'
def __init__(self, cldr, params=None):
BaseValidation.__init__(self, cldr=cldr, params=params)
ValidationDB.__init__(self, inputname=self.inputname, value=self.value, params=self.rulevalue)
self.__message = self.rulemessage or ':attribute tidak tersedia.'
# return True jika data ada. raise form (False) jika data tidak ada
def validate(self):
query = self.query_condition()
data = query.first()
ok = (data is not None)
if not ok:
self.colander_invalid[self.inputname] = self.__message.replace(':attribute', self.title)
return ok
from .base_validation import BaseValidation
class ValidationDigit(BaseValidation):
def __init__(self, cldr, params = {}):
super(ValidationDigit, self).__init__(cldr = cldr, params = params)
self.__message = self.rulemessage or ':attribute harus berupa digit angka.'
def validate(self):
ok = True
if (self.value is None) or (self.value == ''):
ok = not self.required
else:
ok = str(self.value).isdigit()
if not ok:
self.colander_invalid[self.inputname] = self.__message.replace(':attribute', self.title)
return ok
\ No newline at end of file
import re
from .base_validation import BaseValidation
class ValidationEmail(BaseValidation):
def __init__(self, cldr, params = {}):
super(ValidationEmail, self).__init__(cldr = cldr, params = params)
self.__message = self.rulemessage or 'Format email :attribute tidak dapat diterima.'
def validate(self):
ok = True
if (self.value is None) or (self.value == ''):
ok = not self.required
else:
val = str(self.value)
ver = re.match(r'(\w((\.|\-)?\w+)+@(\w(\.|\-)?\w+)+(?:\.\w(\-?\w+)+)+)', val)
ok = (ver is not None)
if not ok:
self.colander_invalid[self.inputname] = self.__message.replace(':attribute', self.title)
return ok
\ No newline at end of file
from .base_validation import BaseValidation
class ValidationFloat(BaseValidation):
def __init__(self, cldr, params = {}):
super(ValidationFloat, self).__init__(cldr = cldr, params = params)
self.__message = self.rulemessage or ':attribute harus berupa angka bilangan bulat atau desimal.'
def validate(self):
ok = True
if (self.value is None) or (self.value == ''):
ok = not self.required
else:
try:
val = float(self.value)
except ValueError:
ok = False
if not ok:
self.colander_invalid[self.inputname] = self.__message.replace(':attribute', self.title)
return ok
\ No newline at end of file
from .base_validation import BaseValidation
class ValidationGreater(BaseValidation):
def __init__(self, cldr, params=None):
super(ValidationGreater, self).__init__(cldr=cldr, params=params)
self.__message = self.rulemessage or 'Value :attribute harus lebih dari :greater.'
def validate(self):
if (self.value is None) or (self.value == ''):
ok = not self.required
else:
# ok = (str(self.value) > str(self.rulevalue))
vt = self.value_type()
if vt == 'string':
ok = (str(self.value) > str(self.rulevalue))
else:
v = vt == 'float' and float(self.value) or int(self.value)
r = vt == 'float' and float(self.rulevalue) or int(self.rulevalue)
ok = (v > r)
if not ok:
self.colander_invalid[self.inputname] = self.__message.replace(':greater', str(self.rulevalue)).replace(':attribute', self.title)
return ok
from .base_validation import BaseValidation
class ValidationGreaterField(BaseValidation):
def __init__(self, cldr, params=None):
super(ValidationGreaterField, self).__init__(cldr=cldr, params=params)
self.__greatervalue = None
if self.rulevalue in self.allvalues:
self.__greatervalue = self.allvalues[self.rulevalue]
self.__message = self.rulemessage or 'Value :attribute harus lebih dari :greater_field.'
def validate(self):
if (self.value is None) or (self.value == ''):
ok = not self.required
else:
# ok = ((self.__greatervalue is not None) and (str(self.value) > str(self.__greatervalue)))
ok = False
if self.__greatervalue is not None:
vt = self.value_type()
if vt == 'string':
ok = (str(self.value) > str(self.rulevalue))
else:
# ok = (self.value > self.rulevalue)
v = vt == 'float' and float(self.value) or int(self.value)
r = vt == 'float' and float(self.__greatervalue) or int(self.__greatervalue)
ok = (v > r)
if not ok:
gvalue = (self.__greatervalue is not None) and str(self.__greatervalue) or 'UNKNOWN'
self.colander_invalid[self.inputname] = self.__message.replace(':greater_field', gvalue).replace(':attribute', self.title)
return ok
from .base_validation import BaseValidation
class ValidationInteger(BaseValidation):
def __init__(self, cldr, params = {}):
super(ValidationInteger, self).__init__(cldr = cldr, params = params)
self.__message = self.rulemessage or ':attribute harus berupa angka bilangan bulat.'
def validate(self):
ok = True
if (self.value is None) or (self.value == ''):
ok = not self.required
else:
try:
val = int(self.value)
except ValueError:
ok = False
if not ok:
self.colander_invalid[self.inputname] = self.__message.replace(':attribute', self.title)
return ok
\ No newline at end of file
from .base_validation import BaseValidation
class ValidationLess(BaseValidation):
def __init__(self, cldr, params=None):
super(ValidationLess, self).__init__(cldr=cldr, params=params)
self.__message = self.rulemessage or 'Value :attribute harus lebih dari :greater.'
def validate(self):
if (self.value is None) or (self.value == ''):
ok = not self.required
else:
# ok = (str(self.value) < str(self.rulevalue))
vt = self.value_type()
if vt == 'string':
ok = (str(self.value) < str(self.rulevalue))
else:
v = vt == 'float' and float(self.value) or int(self.value)
r = vt == 'float' and float(self.rulevalue) or int(self.rulevalue)
ok = (v < r)
if not ok:
self.colander_invalid[self.inputname] = self.__message.replace(':greater', str(self.rulevalue)).replace(':attribute', self.title)
return ok
from .base_validation import BaseValidation
class ValidationLessField(BaseValidation):
def __init__(self, cldr, params=None):
super(ValidationLessField, self).__init__(cldr=cldr, params=params)
self.__lessvalue = None
if self.rulevalue in self.allvalues:
self.__lessvalue = self.allvalues[self.rulevalue]
self.__message = self.rulemessage or 'Value :attribute harus lebih dari :less_field.'
def validate(self):
if (self.value is None) or (self.value == ''):
ok = not self.required
else:
# ok = ((self.__lessvalue != None) and (str(self.value) < str(self.__lessvalue)))
ok = False
if self.__lessvalue is not None:
vt = self.value_type()
if vt == 'string':
ok = (str(self.value) > str(self.rulevalue))
else:
# ok = (self.value > self.rulevalue)
v = vt == 'float' and float(self.value) or int(self.value)
r = vt == 'float' and float(self.__lessvalue) or int(self.__lessvalue)
ok = (v > r)
if not ok:
lvalue = (self.__lessvalue is not None) and str(self.__lessvalue) or 'UNKNOWN'
self.colander_invalid[self.inputname] = self.__message.replace(':less_field', lvalue).replace(':attribute', self.title)
return ok
from .base_validation import BaseValidation
class ValidationList(BaseValidation):
def __init__(self, cldr, params = {}):
super(ValidationList, self).__init__(cldr = cldr, params = params)
self.__value_list = isinstance(self.rulevalue, str) and self.rulevalue.split(',') or list(self.rulevalue)
self.__message = self.rulemessage or ':attribute tidak tersedia.'
def validate(self):
ok = True
if (self.value is None) or (self.value == ''):
ok = not self.required
else:
ok = (str(self.value) in self.__value_list)
if not ok:
self.colander_invalid[self.inputname] = self.__message.replace(':attribute', self.title)
return ok
\ No newline at end of file
from .base_validation import BaseValidation
class ValidationMax(BaseValidation):
def __init__(self, cldr, params=None):
super(ValidationMax, self).__init__(cldr=cldr, params=params)
if self.rulevalue == '':
raise Exception('VALIDATION MAX ERROR', '"max" value tidak dapat validasi')
vt = self.value_type()
self.__rulevalue = (vt == 'float') and float(self.rulevalue) or int(self.rulevalue)
self.__message = self.rulemessage or 'Value :attribute harus kurang dari sama dengan :max.'
def validate(self):
if (self.value is None) or (self.value == ''):
ok = not self.required
else:
if self.value_type() == 'string':
ok = (len(self.value) <= self.__rulevalue)
else:
ok = (self.value <= self.__rulevalue)
if not ok:
self.colander_invalid[self.inputname] = self.__message.replace(':max', str(self.rulevalue)).replace(':attribute', self.title)
return ok
from .base_validation import BaseValidation
class ValidationMin(BaseValidation):
def __init__(self, cldr, params=None):
super(ValidationMin, self).__init__(cldr=cldr, params=params)
if self.rulevalue == '':
raise Exception('VALIDATION MIN ERROR', '"min" value tidak dapat validasi')
vt = self.value_type()
self.__rulevalue = (vt == 'float') and float(self.rulevalue) or int(self.rulevalue)
self.__message = self.rulemessage or 'Value :attribute harus lebih dari sama dengan :min.'
def validate(self):
if (self.value is None) or (self.value == ''):
ok = not self.required
else:
if self.value_type() == 'string':
ok = (len(self.value) >= self.__rulevalue)
else:
ok = (self.value >= self.__rulevalue)
if not ok:
self.colander_invalid[self.inputname] = self.__message.replace(':min', str(self.rulevalue)).replace(':attribute', self.title)
return ok
from .base_validation import BaseValidation
class ValidationRequired(BaseValidation):
def __init__(self, cldr, params = {}):
super(ValidationRequired, self).__init__(cldr = cldr, params = params)
self.__message = self.rulemessage or ':attribute harus diisi'
def validate(self):
ok = not ((self.value is None) or (self.value == ''))
if not ok:
self.colander_invalid[self.inputname] = self.__message.replace(':attribute', self.title)
return ok
\ No newline at end of file
from .base_validation import BaseValidation
class ValidationString(BaseValidation):
def __init__(self, cldr, params = {}):
super(ValidationString, self).__init__(cldr = cldr, params = params)
self.__message = self.rulemessage or ':attribute bukan type string'
def validate(self):
return True
\ No newline at end of file
from .validation_required import ValidationRequired
from .validation_string import ValidationString
from .validation_email import ValidationEmail
from .validation_confirmed import ValidationConfirmed
from .validation_integer import ValidationInteger
from .validation_float import ValidationFloat
from .validation_digit import ValidationDigit
from .validation_min import ValidationMin
from .validation_max import ValidationMax
from .validation_list import ValidationList
from .validation_greater import ValidationGreater
from .validation_less import ValidationLess
from .validation_greater_field import ValidationGreaterField
from .validation_less_field import ValidationLessField
from .validation_datetime import ValidationDateTime
from .validation_db import (
ValidationUniqueDB,
ValidationExistsDB
)
class FormValidator(object):
__invalid_key = 'VALIDATION ERROR'
__map_validators = {
'required' : ValidationRequired,
'string' : ValidationString,
'integer' : ValidationInteger,
'float' : ValidationFloat,
'digit' : ValidationDigit,
'min' : ValidationMin,
'max' : ValidationMax,
'list' : ValidationList,
'greater' : ValidationGreater,
'less' : ValidationLess,
'greater_field' : ValidationGreaterField,
'less_field' : ValidationLessField,
'email' : ValidationEmail,
'confirmed' : ValidationConfirmed,
'datetime' : ValidationDateTime,
'unique_db' : ValidationUniqueDB,
'exists_db' : ValidationExistsDB
}
__map_rules_name = {
'required' : 'required',
'string' : 'string',
'str' : 'string',
'integer' : 'integer',
'int' : 'integer',
'float' : 'float',
'double' : 'float',
'decimal' : 'float',
'number' : 'float',
'digit' : 'digit',
'min' : 'min',
'max' : 'max',
'list' : 'list',
'greater' : 'greater',
'less' : 'less',
'greater_field' : 'greater_field',
'less_field' : 'less_field',
'email' : 'email',
'confirmed' : 'confirmed',
'datetime' : 'datetime',
'date' : 'datetime',
'unique_db' : 'unique_db',
'unique' : 'unique_db',
'exists_db' : 'exists_db',
'exists' : 'exists_db'
}
__boolean_rules = [
'required',
'string',
'integer',
'float',
'email',
'confirmed',
]
def __init__(self, cldr, inputvalues, validations):
self.__colander_invalid = cldr
self.__validations = validations
self.__messages = {}
self.__values = inputvalues
self.__global_rules = {}
self.__validators = []
self.__todict_rules()
self.__bind_validator()
def __fix_rulename(self, rule_key):
return (rule_key in self.__map_rules_name) and self.__map_rules_name[rule_key] or None
def __is_boolean_rule(self, rule_name):
return (rule_name in self.__boolean_rules)
def __todict_rules(self):
def fromstr(rs, cur_index = 0):
result = {}
for x in list(rs.split('|')):
rs = x.split(':')
key = rs[0]
val = (len(rs) > 1) and rs[1] or True
result.update({key: val})
result.update({cur_index: (key, val)})
cur_index += 1
return result
def fromdict(rs, cur_index = 0):
result = {}
for s in rs:
result.update({s: rs[s]})
result.update({cur_index: (s, rs[s])})
cur_index += 1
return result
def fromlist(rs, cur_index = 0):
result = {}
for rxs in rs:
if not isinstance(rxs, (str, dict)):
continue
if isinstance(rxs, str):
ds = fromstr(rxs, cur_index)
else:
ds = fromdict(rxs, cur_index)
result.update(ds)
return result
for inputname, validation in self.__validations.items():
dict_rules = None
rule = ('rules' in validation) and (validation['rules']) or None
if not rule:
continue
if isinstance(rule, str):
dict_rules = fromstr(rule)
elif isinstance(rule, dict):
dict_rules = fromdict(rule)
elif isinstance(rule, list):
dict_rules = fromlist(rule)
if dict_rules:
self.__global_rules[inputname] = dict_rules
self.__messages[inputname] = ('messages' in validation) and validation['messages'] or {}
def __get_validator(self, inputname, inputvalue, rule_key, rule_name, rule_value = None, isrequired = False, messages = {}):
message = (rule_key in messages) and messages[rule_key] or None
if message is None:
message = (rule_name in messages) and messages[rule_name] or None
params = {
'inputname' : inputname,
'inputvalue' : inputvalue,
'rule_value' : rule_value,
'required' : isrequired,
'message' : message,
'all_values' : self.__values
}
validator = None
val_class = (rule_name in self.__map_validators) and self.__map_validators[rule_name] or None
if val_class:
validator = val_class(cldr = self.__colander_invalid, params = params)
else:
raise Exception('Validation Error', 'validation rule "' + rule_key + '" tidak tersedia')
return validator
def __bind_validator(self):
for inputname, rules in self.__global_rules.items():
rules_validator = []
inputvalue = None
if inputname in self.__values:
inputvalue = self.__values[inputname]
isrequired = ('required' in rules and rules['required'] == True)
messages = (inputname in self.__messages) and self.__messages[inputname] or {}
for rule_key, rule_value in rules.items():
if not isinstance(rule_key, int):
continue
rss = rules[rule_key]
key = rss[0]
value = rss[1]
rule_name = self.__fix_rulename(key)
if self.__is_boolean_rule(rule_name) and value != True:
continue
validator = self.__get_validator(
inputname = inputname,
inputvalue = inputvalue,
rule_key = key,
rule_name = rule_name,
rule_value = value,
isrequired = isrequired,
messages = messages
)
rules_validator.append(validator)
if rules_validator:
self.__validators.append(rules_validator)
def validate(self):
if self.__validators:
count_error = 0
for validators in self.__validators:
for validator in validators:
if not validator:
continue
if not validator.validate():
count_error += 1
break
if count_error > 0:
raise self.__colander_invalid
setuptools>=40.8.0
pip>=21.2.4
WebOb>=1.8.7
certifi>=2021.5.30
pyramid>=2.0
pyramid_rpc
venusian>=3.0.0
translationstring>=1.4
plaster>=1.0
hupper>=1.10.3
urllib3>=1.26.6
idna>=3.2
pikepdf
z3c.rml
import os
from setuptools import setup, find_packages
here = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(here, 'README.rst')) as f:
README = f.read()
with open(os.path.join(here, 'CHANGES.txt')) as f:
CHANGES = f.read()
line = CHANGES.splitlines()[0]
version = line.split()[0]
requires = [
]
dev_requires = [
'pyramid_debugtoolbar',
'pytest',
]
setup(
name='opensipkd-tools',
version=version,
description='Tools openSIPKD',
long_description=README + '\n\n' + CHANGES,
author='Agus Gustiana',
author_email='aa.gustiana@gmail.com',
classifiers=[
"Programming Language :: Python",
"Framework :: Pylons",
"Topic :: Internet :: WWW/HTTP",
"Topic :: Internet :: WWW/HTTP :: WSGI :: Application",
],
license='Apache Software License',
keywords='web pyramid pylons base',
packages=find_packages(),
zip_safe=False,
install_requires=requires,
tests_require=requires,
extras_require={
'dev': dev_requires,
},
package_data={},
data_files=[],
include_package_data=True,
entry_points="""\
[console_scripts]
""",
)
import unittest
from opensipkd.tools import get_msisdn, should_int, thousand
class TestTools(unittest.TestCase):
def test_tools(self):
self.assertEqual(get_msisdn('081311045668')[:3], '+62')
self.assertEqual(should_int(1.0), 1)
print(thousand(10000.0))
self.assertEqual(thousand(1000.0), '1.000')
if __name__ == '__main__':
unittest.main()
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!