Commit 7eab0633 by Administrator

Update __init__.py

1 parent f4684762
from __future__ import print_function
import os
import re
import mimetypes
import csv
import calendar
import datetime
from random import choice
from string import (ascii_uppercase, ascii_lowercase, digits, )
import locale
import colander
import pytz
import io
from pyramid.httpexceptions import HTTPNotFound
from pyramid.threadlocal import get_current_registry
from json import JSONEncoder
import logging
log = logging.getLogger(__name__)
################
# Phone number #
################
MSISDN_ALLOW_CHARS = map(lambda x: str(x), range(10)) # + ['+']
def get_msisdn(msisdn, country='+62'):
"""
Digunakan untuk pengecekan no telp suatu negara
:param msisdn:
:param country:
:return: output kode negara+no msisdn
"""
# for ch in msisdn:
# if ch not in MSISDN_ALLOW_CHARS:
# raise Exception(f'{ch} not in {print(x for x in range(10))}')
try:
i = int(msisdn)
except ValueError as e:
log.info(e)
return
if not i:
raise Exception('MSISDN must filled')
if len(str(i)) < 7:
raise Exception('MSISDN less then 7 number')
if re.compile(r'^\+').search(msisdn):
return msisdn
if re.compile(r'^0').search(msisdn):
return '%s%s' % (country, msisdn.lstrip('0'))
################
# Money format #
################
def should_int(value):
int_ = int(value)
if int_ == value:
return int_
return value
def thousand(value, float_count=None):
"""
Memisahkan ribuan
:param value:
:param float_count:
:return: string
"""
if not value:
return "0"
if float_count is None: # autodetection
if type(value) in (int, float):
float_count = 0
else:
float_count = 2
return locale.format_string('%%.%df' % float_count, value, True)
def money(value, float_count=None, currency=None):
"""
Memisahkan ribuan dan menambahkan nama mata uang
:param value:
:param float_count:
:param currency:
:return:
"""
if value < 0:
v = abs(value)
format_ = '(%s)'
else:
v = value
format_ = '%s'
if currency is None:
currency = locale.localeconv()['currency_symbol']
s = ' '.join([currency, thousand(v, float_count)])
return format_ % s
def round_up(n):
i = int(n)
if n == i:
return i
if n > i:
return i + 1
return i - 1
###########
# Pyramid #
###########
def get_settings():
return get_current_registry().settings
def get_params(params, alternate=None):
"""
Digunakan untuk mengambil nilai dari konfigurasi sesuai params yang disebut
:param params: variable
:param alternate: default apabila tidak ditemukan data/params
:return: value
contoh penggunaan:
get_params('devel', False)
"""
settings = get_settings()
result = settings and params in settings and settings[
params].strip() or None
return result and result or alternate
def devel():
settings = get_settings()
is_devel = 'devel' in settings and settings['devel'] and True or False
print("DEBUG>>", is_devel)
return is_devel
def get_timezone():
settings = get_settings()
return pytz.timezone(settings['timezone'])
def get_modules():
settings = get_settings()
return settings['modules'].split(',')
########
# Time #
########
one_second = datetime.timedelta(1.0 / 24 / 60 / 60)
DateType = type(datetime.date.today())
DateTimeType = type(datetime.datetime.now())
TimeZoneFile = '/etc/timezone'
if os.path.exists(TimeZoneFile):
DefaultTimeZone = open(TimeZoneFile).read().strip()
else:
DefaultTimeZone = 'Asia/Jakarta'
def as_timezone(tz_date):
localtz = get_timezone()
if not tz_date.tzinfo:
tz_date = create_datetime(tz_date.year, tz_date.month, tz_date.day,
tz_date.hour, tz_date.minute, tz_date.second,
tz_date.microsecond)
return tz_date.astimezone(localtz)
def create_datetime(year, month, day, hour=0, minute=7, second=0,
microsecond=0):
tz = get_timezone()
dt = datetime.datetime(year, month, day, hour, minute, second, microsecond)
return tz.localize(dt)
def create_date(year, month, day):
return create_datetime(year, month, day)
def create_now():
tz = get_timezone()
return datetime.datetime.now(tz)
def date_from_str(value):
separator = None
value = value.split()[0] # dd-mm-yyyy HH:MM:SS
for s in ['-', '/', '.']:
if value.find(s) > -1:
separator = s
break
if separator:
t = list(map(lambda x: int(x), value.split(separator)))
y, m, d = t[2], t[1], t[0]
if d > 999: # yyyy-mm-dd
y, d = d, y
else:
y, m, d = int(value[:4]), int(value[4:6]), int(value[6:])
return datetime.date(y, m, d)
def time_from_str(value):
# separator = ":"
value = value.split()[1] # dd-mm-yyyy HH:MM:SS
# return value.strptime('%H:%M:%S')
h, m, s = value.split(":")
# return datetime.time(h, m, s)
return datetime.timedelta(hours=int(h), minutes=int(m), seconds=int(s))
def datetime_from_str(value):
# separator = None
dt = date_from_str(value)
tm = time_from_str(value)
return datetime.datetime(dt.year, dt.month, dt.day)+tm
def dmy(tgl):
return tgl.strftime('%d-%m-%Y')
def hms(tgl):
return tgl.strftime('%H:%M:%S')
def dmy_to_date(tgl):
return datetime.datetime.strptime(tgl, '%d-%m-%Y')
def dmyhms(tgl):
return tgl.strftime('%d-%m-%Y %H:%M:%S')
def ymd(tgl):
return tgl.strftime('%Y-%m-%d')
def ymdhms(tgl):
return tgl.strftime('%Y-%m-%d %H:%M:%S')
def dMy(tgl):
return str(tgl.day) + ' ' + NAMA_BULAN[tgl.month][1] + ' ' + str(tgl.year)
def tampil_bulan(bulan):
return NAMA_BULAN[bulan][1]
def next_month(year, month):
if month == 12:
month = 1
year += 1
else:
month += 1
return year, month
def best_date(year, month, day):
try:
return datetime.date(year, month, day)
except ValueError:
last_day = calendar.monthrange(year, month)[1]
return datetime.date(year, month, last_day)
def next_month_day(year, month, day):
year, month = next_month(year, month)
return best_date(year, month, day)
def count_day(date):
date = str(date) # 2018-01-02 list
year = int(date[:4])
if date[5] == '0':
month = int(date[6])
else:
month = int(date[5:7])
count_day = calendar.monthrange(year, month)
return int(count_day[1])
NAMA_BULAN = (
(0, "--Bulan--"),
(1, "Januari", "Jan"),
(2, "Februari", "Feb"),
(3, "Maret", "Mar"),
(4, "April", "Apr"),
(5, "Mei", "Mei"),
(6, "Juni", "Jun"),
(7, "Juli", "Jul"),
(8, "Agustus", "Agu"),
(9, "September", "Sep"),
(10, "Oktober", "Okt"),
(11, "Nopember", "Nov"),
(12, "Desember", "Des"),
)
##########
# String #
##########
def one_space(s):
s = s.strip()
while s.find(' ') > -1:
s = s.replace(' ', ' ')
return s
def to_str(v):
typ = type(v)
if typ == DateType:
return dmy(v)
if typ == DateTimeType:
return dmyhms(v)
if v == 0:
return '0'
if typ == str:
return v.strip()
elif typ == bool:
return v and '1' or '0'
return v and str(v) or ''
def dict_to_str(d):
r = {}
for key in d:
val = d[key]
r[key] = to_str(val)
return r
def split(s, c=4):
r = []
while s:
t = s[:c]
r.append(t)
s = s[c:]
return ' '.join(r)
def url_join(*args):
url = ''
for arg in args:
if arg:
aurl = arg.strip('/')
url = url + aurl + '/'
url = url.rstrip('/')
return url
def upper_dict(d):
new_dict = dict((str(k).upper(), v) for k, v in d.items())
return new_dict
########
# File #
########
# http://stackoverflow.com/questions/2257441/random-string-generation-with-upper-case-letters-and-digits-in-python
def get_random_string(width=6):
return ''.join(choice(ascii_uppercase + ascii_lowercase + digits) \
for _ in range(width))
def get_random_number(width=12):
return ''.join(choice(digits) \
for _ in range(width))
def get_ext(filename):
return os.path.splitext(filename)[-1]
def get_filename(filename):
return "".join(os.path.splitext(filename)[:-1])
def file_type(filename):
ctype, encoding = mimetypes.guess_type(filename)
if ctype is None or encoding is not None:
ctype = 'application/octet-stream'
return ctype
class SaveFile(object):
def __init__(self, dir_path):
self.dir_path = dir_path
if not os.path.exists(dir_path):
os.makedirs(dir_path)
# Unchanged file extension, and make file prefix unique with sequential
# number.
def create_fullpath(self, ext=''):
while True:
filename = get_random_string() + ext
full_path = os.path.join(self.dir_path, filename)
if not os.path.exists(full_path):
return full_path
def save(self, content, ext='', filename=None):
if filename:
fullpath = os.path.join(self.dir_path, filename)
else:
fullpath = self.create_fullpath(ext=ext)
f = open(fullpath, 'wb')
if type(content) == io.BytesIO:
f.write(content.getbuffer())
else:
f.write(content)
f.close()
return fullpath
class InvalidExtension(Exception):
def __init__(self, exts):
self.error = f"Supported extensions {exts}"
class MemoryTmpStore(dict):
""" Instances of this class implement the
:class:`defWorm.interfaces.FileUploadTempStore` interface"""
def preview_url(self, uid):
return None
mem_tmp_store = MemoryTmpStore()
img_exts = ['.png', '.jpg', '.pdf', '.jpeg']
def image_validator(node, value):
ext = get_ext(value["filename"])
if ext not in img_exts:
raise colander.Invalid(node,
f'Extension harus salahsatu dari {img_exts}')
def file_response(request, f, filename, type):
response = request.response
response.content_type = str(type)
response.content_disposition = 'filename=' + filename
response.write(f.read())
return response
class Upload(SaveFile):
def save_to_file(self, input_file, ext, filename=None):
if filename:
fullpath = os.path.join(self.dir_path, filename, ext)
else:
fullpath = self.create_fullpath(ext)
output_file = open(fullpath, 'wb')
input_file.seek(0)
while True:
data = input_file.read(2 << 16)
if not data:
break
output_file.write(data)
output_file.close()
return fullpath
def save(self, request, name, exts=None, filename=None):
input_file = request.POST[name].file
ext = get_ext(request.POST[name].filename)
if exts and ext not in exts:
raise InvalidExtension(exts)
filename = self.save_to_file(input_file, ext, filename=filename)
head, filename = os.path.split(filename)
return filename
def save_fp(self, upload):
if 'fp' not in upload or upload['fp'] == b'':
if "filename" in upload:
return upload['filename']
return
filename = upload['filename']
input_file = upload['fp']
ext = get_ext(filename)
return self.save_to_file(input_file, ext)
def saves(self, uploads):
d = {}
for upload in uploads:
if 'fp' not in upload or upload['fp'] == b'':
continue
filename = upload['filename']
# input_file = upload['fp']
# ext = get_ext(filename)
# d[filename] = self.save_to_file(input_file, ext)
d[filename] = self.save_fp(upload)
return d
def download(self, request, filename):
out_filename = os.path.join(self.dir_path, filename)
if not os.path.exists(out_filename):
raise HTTPNotFound
ext = get_ext(filename)[1:]
if ext in ['gif', 'png', 'jpeg', 'jpg']:
ext = 'image/' + ext
with open(out_filename, 'rb') as f:
return file_response(request, f, filename, ext)
class UploadBin(Upload):
"""
Compatibility to previous
"""
pass
class CSVRenderer(object):
def __init__(self, info):
pass
def __call__(self, value, system):
""" Returns a plain CSV-encoded string with content-type
``text/csv``. The content-type may be overridden by
setting ``request.response.content_type``."""
request = system.get('request')
if request is not None:
response = request.response
ct = response.content_type
if ct == response.default_content_type:
response.content_type = 'text/csv'
# error ketika open csv menggunakan bytes
# karena tak mau hapus, maka dibikin try
try:
fout = io.BytesIO() #
fcsv = csv.writer(fout, delimiter=',', quotechar='"',
quoting=csv.QUOTE_MINIMAL)
fcsv.writerow(value.get('header', []))
fcsv.writerows(value.get('rows', []))
except:
fout = io.StringIO()
fcsv = csv.writer(fout, delimiter=',', quotechar='"',
quoting=csv.QUOTE_MINIMAL)
fcsv.writerow(value.get('header', []))
fcsv.writerows(value.get('rows', []))
return fout.getvalue()
# # Data Table Function
# def _DTstrftime(chain):
# ret = chain and datetime.strftime(chain, '%d-%m-%Y')
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTnumber_format(chain):
# import locale
# locale.setlocale(locale.LC_ALL, 'id_ID.utf8')
# ret = locale.format("%d", chain, grouping=True)
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTupper(chain):
# ret = chain.upper()
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTstatus(chain):
# if chain:
# ret = 'Aktif'
# else:
# ret = 'Mati'
# if ret:
# return ret
# else:
# return chain
##########
# String #
##########
def clean(s):
r = ''
for ch in s:
asc = ord(ch)
if asc > 126 or asc < 32:
ch = ' '
r += ch
return r
# def to_str(s):
# s = s or ''
# s = str(s)
# return clean(s)
def left(s, width):
s = to_str(s)
return s.ljust(width)[:width]
def right(s, width):
s = to_str(s)
return s.zfill(width)[:width]
##################
# Data Structure #
##################
class FixLength(object):
def __init__(self, struct):
self.struct = self.fields = None
self.set_struct(struct)
def set_struct(self, struct):
self.struct = struct
self.fields = {}
for s in struct:
name = s[0]
size = s[1:] and s[1] or 1
typ = s[2:] and s[2] or 'A' # N: numeric, A: alphanumeric
self.fields[name] = {'value': None, 'type': typ, 'size': size}
def set(self, name, value):
self.fields[name]['value'] = value
def get(self, name):
v = self.fields[name]['value']
if v:
return v.strip()
return ''
def __setitem__(self, name, value):
self.set(name, value)
def __getitem__(self, name):
return self.get(name)
def get_raw(self):
return self.get_spliter("")
def get_raw_dotted(self):
return self.get_dotted()
def get_value(self, name, typ):
v = self.fields[name]['value']
if v and typ == 'N':
i = int(v)
if v == i:
v = i
return v
def get_spliter(self, ch):
"""
2022-09-12 Perbaikan return jika ch is none
"""
s = ''
for name, size, typ in self.struct:
v = self.get_value(name, typ)
pad_func = typ == 'N' and right or left
s += pad_func(v, size)
s += ch
if ch:
return s[:-1]
return s
def get_dotted(self):
return self.get_spliter('.')
def set_raw(self, raw):
awal = 0
for t in self.struct:
name = t[0]
size = t[1:] and t[1] or 1
akhir = awal + size
value = raw[awal:akhir]
if not value:
return
self.set(name, value)
awal += size
return True
def from_dict(self, d):
for name in self.fields:
if name in d and d[name]:
value = d[name]
self.set(name, value)
def to_dict(self):
r = {}
for name in self.fields:
r[name] = self.get(name)
return r
def length(self):
return len(self.get_raw())
import decimal
class Encoder(JSONEncoder):
"""Extends JSONEncoder for date and decimal types."""
def push_date(self, d):
"""Serialize the given datetime.date object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
return "%04d-%02d-%02d" % (d.year, d.month, d.day)
def push_timedelta(self, t):
"""Serialize the given datetime.timedelta object to a JSON string."""
days = t.days
if days < 0:
minus = "-"
days = -days - 1
seconds = 24 * 60 * 60 - t.seconds
else:
minus = ""
seconds = t.seconds
secs = seconds % 60
seconds /= 60
mins = seconds % 60
hours = seconds / 60
return "%s%d:%02d:%02d:%02d" % (minus, days, hours, mins, secs)
def push_time(self, t):
"""Serialize the given datetime.time object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
return "%02d:%02d:%02d" % (t.hour, t.minute, t.second)
def push_datetime(self, dt):
"""Serialize the given datetime.datetime object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
# Don't use strftime because that can't handle dates before 1900.
return ("%04d-%02d-%02dT%02d:%02d:%02d" %
(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second))
def default(self, o):
# We MUST check for a datetime.datetime instance before datetime.date.
# datetime.datetime is a subclass of datetime.date, and therefore
# instances of it are also instances of datetime.date.
if isinstance(o, datetime.datetime):
return self.push_datetime(o)
elif isinstance(o, datetime.date):
return self.push_date(o)
elif isinstance(o, datetime.timedelta):
return self.push_timedelta(o)
elif isinstance(o, datetime.time):
return self.push_time(o)
elif isinstance(o, decimal.Decimal):
return str(o)
else:
return JSONEncoder.default(self, o)
STATUS = (
(1, 'Aktif'),
(0, 'Pasif')
)
INVOICE_STATUS = (
(0, 'Draft'),
(1, 'Approved'),
)
def pbb_biller_split():
settings = get_settings()
if not 'pbb_biller' in settings:
return None, None
biller = re.sub('\D', '', settings['pbb_biller'])
return biller[:2], biller[2:]
def clear_null_value(values):
# digunakan untuk menghapus dictionary yang value nya null
tobe_del = []
for value in values:
if not values[value]:
tobe_del.append(value)
for value in tobe_del:
del values[value]
return values
def replace_char(text, start, stop, char="X"):
x = char * (stop - start + 1)
s = text[:start] + x + text[stop:]
# print "*** DEBUG ", text, s
return s
##################
# Email #
##################
# import smtplib
# from email.MIMEMultipart import MIMEMultipart
# from email.MIMEText import MIMEText
# from email.MIMEBase import MIMEBase
# import base64
SCOPES = [
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/userinfo.profile',
# Add other requested scopes.
]
# def send_email(to_addr_list, subject, message, cc_addr_list=None,from_addr=None,
# login=None, password=None, file=None,
# smtp_server='smtp.gmail.com:587'):
# settings = get_settings()
# if settings['devel']=='true':
# return
#
# if not login:
#
# login = 'center.email' in settings and settings['center.email'] or None
# # if login[-9:]=="gmail.com":
# # return send_gmsg(to=to_addr_list, subject=subject, message_text=message, file=file,)
#
# if not password:
# password = 'center.email_password' in settings and settings['center.email_password'] or None
# if not login or not password:
# return 'Error smtp login or Password'
#
# if 'center.smtp_server' in settings and settings['center.smtp_server']:
# smtp_server = settings['center.smtp_server']
#
# if not from_addr:
# from_addr = login
#
# header = 'From: %s\r\n' % from_addr
# header += 'To: %s \r\n' % ','.join(to_addr_list)
# if cc_addr_list:
# header += 'Cc: %s \r\n' % ','.join(cc_addr_list)
# header += 'Subject: %s \r\n' % subject
# message = header + message
# smtp, port = smtp_server.split(':')
# port = int(port)
#
# #print '>>>>>>>>>>>>', smtp, port, login,password
# try:
# #print smtp_server, smtp, port
# server = smtplib.SMTP(smtp,port) #smtp_server)
# server.ehlo()
# if port==587:
# server.starttls()
#
# server.login(login,password)
# problems = server.sendmail(from_addr, to_addr_list, message)
# server.quit()
# return
# except Exception as e:
# return str(e)
#
# def create_message(sender, to, subject, message_text, file=None):
# """Create a message for an email.
#
# Args:
# sender: Email address of the sender.
# to: Email address of the receiver.
# subject: The subject of the email message.
# message_text: The text of the email message.
# file: The path to the file to be attached.
#
# Returns:
# An object containing a base64url encoded email object.
# """
# if file:
# message = MIMEMultipart()
# msg = MIMEText(message_text)
# else:
# message = MIMEText(message_text)
#
# message['to'] = to
# message['from'] = sender
# message['subject'] = subject
#
# if file:
# message.attach(msg)
# content_type, encoding = mimetypes.guess_type(file)
#
# if content_type is None or encoding is not None:
# content_type = 'application/octet-stream'
# main_type, sub_type = content_type.split('/', 1)
# if main_type == 'text':
# fp = open(file, 'rb')
# msg = MIMEText(fp.read(), _subtype=sub_type)
# fp.close()
# elif main_type == 'image':
# fp = open(file, 'rb')
# msg = MIMEImage(fp.read(), _subtype=sub_type)
# fp.close()
# elif main_type == 'audio':
# fp = open(file, 'rb')
# msg = MIMEAudio(fp.read(), _subtype=sub_type)
# fp.close()
# else:
# fp = open(file, 'rb')
# msg = MIMEBase(main_type, sub_type)
# msg.set_payload(fp.read())
# fp.close()
# filename = os.path.basename(file)
# msg.add_header('Content-Disposition', 'attachment', filename=filename)
# message.attach(msg)
#
# return {'raw': base64.urlsafe_b64encode(message.as_string())}
#
#
#
# def get_service():
# from googleapiclient.discovery import build
# from httplib2 import Http
# from oauth2client import file, client, tools
# """Shows basic usage of the Gmail API.
# Lists the user's Gmail labels.
# """
# # The file token.json stores the user's access and refresh tokens, and is
# # created automatically when the authorization flow completes for the first
# # time.
# store = file.Storage('token.json')
# creds = store.get()
# if not creds or creds.invalid:
# flow = client.flow_from_clientsecrets('./credentials.json', SCOPES)
# creds = tools.run_flow(flow, store)
# return build('gmail', 'v1', http=creds.authorize(Http()))
#
# def send_gmsg(to, subject, message_text, file=None):
# settings = get_settings()
# sender = 'center.email' in settings and settings['center.email'] or None
#
# message = create_message(sender, to, subject, message_text, file=None)
# service = get_service()
# try:
# message = (service.users().messages().send(userId=user_id, body=message)
# .execute())
# return message
# except errors.HttpError as error:
# pass
#
# http://python.web.id/angka-terbilang-pada-python/#.VQpS8s2sXQo
satuan = ['', 'satu', 'dua', 'tiga', 'empat', 'lima', 'enam', 'tujuh',
'delapan', 'sembilan', 'sepuluh', 'sebelas']
def terbilang_(n):
n = int(n)
if n >= 1000000000:
hasil = terbilang_(n / 1000000000) + ['milyar'] + terbilang_(
n % 100000000)
elif n > 1000000:
hasil = terbilang_(n / 1000000) + ['juta'] + terbilang_(n % 1000000)
elif n >= 2000:
hasil = terbilang_(n / 1000) + ['ribu'] + terbilang_(n % 1000)
elif n >= 1000:
hasil = ['seribu'] + terbilang_(n - 1000)
elif n >= 200:
hasil = terbilang_(n / 100) + ['ratus'] + terbilang_(n % 100)
elif n >= 100:
hasil = ['seratus'] + terbilang_(n - 100)
elif n >= 20:
hasil = terbilang_(n / 10) + ['puluh'] + terbilang_(n % 10)
elif n >= 12:
hasil = terbilang_(n % 10) + ['belas']
else:
hasil = [satuan[n]]
print('hasil')
return hasil
def terbilang(n):
if n == 0:
return 'nol'
t = terbilang_(n)
while '' in t:
t.remove('')
return ' '.join(t)
def number_only(value):
return re.sub('\D', "", value)
def set_user_log(message, request, logobj=None, user_name=None):
if not logobj:
logobj = log
if not request:
logobj.info("Request not Set")
return
if not user_name:
user_name = request and request.user and request.user.user_name or None
if not user_name:
logobj.info("User Name not Set")
return
addr = request.client_addr
message = "User {} at Addr {}: {}".format(user_name, addr, message)
logobj.warning(message)
def create_static_path(config, folder, url, cache_max_age=3600):
log.debug("Folder to create: %s", folder)
if not os.path.exists(folder):
try:
os.makedirs(folder)
except Exception as e:
log.debug("User: %s", os.path.expanduser("~"))
log.debug("Error: %s", str(e))
config.add_static_view(url, path=folder, cache_max_age=cache_max_age)
def includeme(config):
config.add_translation_dirs('opensipkd.tools:locale/')
from __future__ import print_function
import os
import re
import mimetypes
import csv
import calendar
import datetime
from random import choice
from string import (ascii_uppercase, ascii_lowercase, digits, )
import locale
import colander
import pytz
import io
from pyramid.httpexceptions import HTTPNotFound
from pyramid.threadlocal import get_current_registry
from json import JSONEncoder
import logging
log = logging.getLogger(__name__)
################
# Phone number #
################
MSISDN_ALLOW_CHARS = map(lambda x: str(x), range(10)) # + ['+']
def get_msisdn(msisdn, country='+62'):
"""
Digunakan untuk pengecekan no telp suatu negara
:param msisdn:
:param country:
:return: output kode negara+no msisdn
"""
# for ch in msisdn:
# if ch not in MSISDN_ALLOW_CHARS:
# raise Exception(f'{ch} not in {print(x for x in range(10))}')
try:
i = int(msisdn)
except ValueError as e:
log.info(e)
return
if not i:
raise Exception('MSISDN must filled')
if len(str(i)) < 7:
raise Exception('MSISDN less then 7 number')
if re.compile(r'^\+').search(msisdn):
return msisdn
if re.compile(r'^0').search(msisdn):
return '%s%s' % (country, msisdn.lstrip('0'))
################
# Money format #
################
def should_int(value):
int_ = int(value)
if int_ == value:
return int_
return value
def thousand(value, float_count=None):
"""
Memisahkan ribuan
:param value:
:param float_count:
:return: string
"""
if not value:
return "0"
if float_count is None: # autodetection
if type(value) in (int, float):
float_count = 0
else:
float_count = 2
return locale.format_string('%%.%df' % float_count, value, True)
def money(value, float_count=None, currency=None):
"""
Memisahkan ribuan dan menambahkan nama mata uang
:param value:
:param float_count:
:param currency:
:return:
"""
if value < 0:
v = abs(value)
format_ = '(%s)'
else:
v = value
format_ = '%s'
if currency is None:
currency = locale.localeconv()['currency_symbol']
s = ' '.join([currency, thousand(v, float_count)])
return format_ % s
def round_up(n):
i = int(n)
if n == i:
return i
if n > i:
return i + 1
return i - 1
###########
# Pyramid #
###########
def get_settings():
return get_current_registry().settings
def get_params(params, alternate=None):
"""
Digunakan untuk mengambil nilai dari konfigurasi sesuai params yang disebut
:param params: variable
:param alternate: default apabila tidak ditemukan data/params
:return: value
contoh penggunaan:
get_params('devel', False)
"""
settings = get_settings()
result = settings and params in settings and settings[
params].strip() or None
return result and result or alternate
def devel():
settings = get_settings()
is_devel = 'devel' in settings and settings['devel'] and True or False
print("DEBUG>>", is_devel)
return is_devel
def get_timezone():
settings = get_settings()
return pytz.timezone(settings['timezone'])
def get_modules():
settings = get_settings()
return settings['modules'].split(',')
########
# Time #
########
one_second = datetime.timedelta(1.0 / 24 / 60 / 60)
DateType = type(datetime.date.today())
DateTimeType = type(datetime.datetime.now())
TimeZoneFile = '/etc/timezone'
if os.path.exists(TimeZoneFile):
DefaultTimeZone = open(TimeZoneFile).read().strip()
else:
DefaultTimeZone = 'Asia/Jakarta'
def as_timezone(tz_date):
localtz = get_timezone()
if not tz_date.tzinfo:
tz_date = create_datetime(tz_date.year, tz_date.month, tz_date.day,
tz_date.hour, tz_date.minute, tz_date.second,
tz_date.microsecond)
return tz_date.astimezone(localtz)
def create_datetime(year, month, day, hour=0, minute=7, second=0,
microsecond=0):
tz = get_timezone()
dt = datetime.datetime(year, month, day, hour, minute, second, microsecond)
return tz.localize(dt)
def create_date(year, month, day):
return create_datetime(year, month, day)
def create_now():
tz = get_timezone()
return datetime.datetime.now(tz)
def date_from_str(value):
separator = None
value = value.split()[0] # dd-mm-yyyy HH:MM:SS
for s in ['-', '/', '.']:
if value.find(s) > -1:
separator = s
break
if separator:
t = list(map(lambda x: int(x), value.split(separator)))
y, m, d = t[2], t[1], t[0]
if d > 999: # yyyy-mm-dd
y, d = d, y
else:
y, m, d = int(value[:4]), int(value[4:6]), int(value[6:])
return datetime.date(y, m, d)
def time_from_str(value):
# separator = ":"
value = value.split()[1] # dd-mm-yyyy HH:MM:SS
# return value.strptime('%H:%M:%S')
h, m, s = value.split(":")
# return datetime.time(h, m, s)
return datetime.timedelta(hours=int(h), minutes=int(m), seconds=int(s))
def datetime_from_str(value):
# separator = None
dt = date_from_str(value)
tm = time_from_str(value)
return datetime.datetime(dt.year, dt.month, dt.day)+tm
def dmy(tgl):
return tgl.strftime('%d-%m-%Y')
def hms(tgl):
return tgl.strftime('%H:%M:%S')
def dmy_to_date(tgl):
return datetime.datetime.strptime(tgl, '%d-%m-%Y')
def dmyhms(tgl):
return tgl.strftime('%d-%m-%Y %H:%M:%S')
def ymd(tgl):
return tgl.strftime('%Y-%m-%d')
def ymdhms(tgl):
return tgl.strftime('%Y-%m-%d %H:%M:%S')
def dMy(tgl):
return str(tgl.day) + ' ' + NAMA_BULAN[tgl.month][1] + ' ' + str(tgl.year)
def tampil_bulan(bulan):
return NAMA_BULAN[bulan][1]
def next_month(year, month):
if month == 12:
month = 1
year += 1
else:
month += 1
return year, month
def best_date(year, month, day):
try:
return datetime.date(year, month, day)
except ValueError:
last_day = calendar.monthrange(year, month)[1]
return datetime.date(year, month, last_day)
def next_month_day(year, month, day):
year, month = next_month(year, month)
return best_date(year, month, day)
def count_day(date):
date = str(date) # 2018-01-02 list
year = int(date[:4])
if date[5] == '0':
month = int(date[6])
else:
month = int(date[5:7])
count_day = calendar.monthrange(year, month)
return int(count_day[1])
NAMA_BULAN = (
(0, "--Bulan--"),
(1, "Januari", "Jan"),
(2, "Februari", "Feb"),
(3, "Maret", "Mar"),
(4, "April", "Apr"),
(5, "Mei", "Mei"),
(6, "Juni", "Jun"),
(7, "Juli", "Jul"),
(8, "Agustus", "Agu"),
(9, "September", "Sep"),
(10, "Oktober", "Okt"),
(11, "Nopember", "Nov"),
(12, "Desember", "Des"),
)
##########
# String #
##########
def one_space(s):
s = s.strip()
while s.find(' ') > -1:
s = s.replace(' ', ' ')
return s
def to_str(v):
typ = type(v)
if typ == DateType:
return dmy(v)
if typ == DateTimeType:
return dmyhms(v)
if v == 0:
return '0'
if typ == str:
return v.strip()
elif typ == bool:
return v and '1' or '0'
return v and str(v) or ''
def dict_to_str(d):
r = {}
for key in d:
val = d[key]
r[key] = to_str(val)
return r
def split(s, c=4):
r = []
while s:
t = s[:c]
r.append(t)
s = s[c:]
return ' '.join(r)
def url_join(*args):
url = ''
for arg in args:
if arg:
aurl = arg.strip('/')
url = url + aurl + '/'
url = url.rstrip('/')
return url
def upper_dict(d):
new_dict = dict((str(k).upper(), v) for k, v in d.items())
return new_dict
########
# File #
########
# http://stackoverflow.com/questions/2257441/random-string-generation-with-upper-case-letters-and-digits-in-python
def get_random_string(width=6):
return ''.join(choice(ascii_uppercase + ascii_lowercase + digits) \
for _ in range(width))
def get_random_number(width=12):
return ''.join(choice(digits) \
for _ in range(width))
def get_ext(filename):
return os.path.splitext(filename)[-1]
def get_filename(filename):
return "".join(os.path.splitext(filename)[:-1])
def file_type(filename):
ctype, encoding = mimetypes.guess_type(filename)
if ctype is None or encoding is not None:
ctype = 'application/octet-stream'
return ctype
class SaveFile(object):
def __init__(self, dir_path):
self.dir_path = dir_path
if not os.path.exists(dir_path):
os.makedirs(dir_path)
# Unchanged file extension, and make file prefix unique with sequential
# number.
def create_fullpath(self, ext=''):
while True:
filename = get_random_string() + ext
full_path = os.path.join(self.dir_path, filename)
if not os.path.exists(full_path):
return full_path
def save(self, content, ext='', filename=None):
if filename:
fullpath = os.path.join(self.dir_path, filename)
else:
fullpath = self.create_fullpath(ext=ext)
f = open(fullpath, 'wb')
if type(content) == io.BytesIO:
f.write(content.getbuffer())
else:
f.write(content)
f.close()
return fullpath
class InvalidExtension(Exception):
def __init__(self, exts):
self.error = f"Supported extensions {exts}"
class MemoryTmpStore(dict):
""" Instances of this class implement the
:class:`defWorm.interfaces.FileUploadTempStore` interface"""
def preview_url(self, uid):
return None
mem_tmp_store = MemoryTmpStore()
img_exts = ['.png', '.jpg', '.pdf', '.jpeg']
def image_validator(node, value):
ext = get_ext(value["filename"])
if ext not in img_exts:
raise colander.Invalid(node,
f'Extension harus salahsatu dari {img_exts}')
def file_response(request, f, filename, type):
response = request.response
response.content_type = str(type)
response.content_disposition = 'filename=' + filename
response.write(f.read())
return response
class Upload(SaveFile):
def save_to_file(self, input_file, ext, filename=None):
if filename:
fullpath = os.path.join(self.dir_path, filename, ext)
else:
fullpath = self.create_fullpath(ext)
output_file = open(fullpath, 'wb')
input_file.seek(0)
while True:
data = input_file.read(2 << 16)
if not data:
break
output_file.write(data)
output_file.close()
return fullpath
def save(self, request, name, exts=None, filename=None):
input_file = request.POST[name].file
ext = get_ext(request.POST[name].filename)
if exts and ext not in exts:
raise InvalidExtension(exts)
filename = self.save_to_file(input_file, ext, filename=filename)
head, filename = os.path.split(filename)
return filename
def save_fp(self, upload):
if 'fp' not in upload or upload['fp'] == b'':
if "filename" in upload:
return upload['filename']
return
filename = upload['filename']
input_file = upload['fp']
ext = get_ext(filename)
return self.save_to_file(input_file, ext)
def saves(self, uploads):
d = {}
for upload in uploads:
if 'fp' not in upload or upload['fp'] == b'':
continue
filename = upload['filename']
# input_file = upload['fp']
# ext = get_ext(filename)
# d[filename] = self.save_to_file(input_file, ext)
d[filename] = self.save_fp(upload)
return d
def download(self, request, filename):
out_filename = os.path.join(self.dir_path, filename)
if not os.path.exists(out_filename):
raise HTTPNotFound
ext = get_ext(filename)[1:]
if ext in ['gif', 'png', 'jpeg', 'jpg']:
ext = 'image/' + ext
with open(out_filename, 'rb') as f:
return file_response(request, f, filename, ext)
class UploadBin(Upload):
"""
Compatibility to previous
"""
pass
class CSVRenderer(object):
def __init__(self, info):
pass
def __call__(self, value, system):
""" Returns a plain CSV-encoded string with content-type
``text/csv``. The content-type may be overridden by
setting ``request.response.content_type``."""
request = system.get('request')
if request is not None:
response = request.response
ct = response.content_type
if ct == response.default_content_type:
response.content_type = 'text/csv'
# error ketika open csv menggunakan bytes
# karena tak mau hapus, maka dibikin try
try:
fout = io.BytesIO() #
fcsv = csv.writer(fout, delimiter=',', quotechar='"',
quoting=csv.QUOTE_MINIMAL)
fcsv.writerow(value.get('header', []))
fcsv.writerows(value.get('rows', []))
except:
fout = io.StringIO()
fcsv = csv.writer(fout, delimiter=',', quotechar='"',
quoting=csv.QUOTE_MINIMAL)
fcsv.writerow(value.get('header', []))
fcsv.writerows(value.get('rows', []))
return fout.getvalue()
# # Data Table Function
# def _DTstrftime(chain):
# ret = chain and datetime.strftime(chain, '%d-%m-%Y')
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTnumber_format(chain):
# import locale
# locale.setlocale(locale.LC_ALL, 'id_ID.utf8')
# ret = locale.format("%d", chain, grouping=True)
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTupper(chain):
# ret = chain.upper()
# if ret:
# return ret
# else:
# return chain
#
#
# def _DTstatus(chain):
# if chain:
# ret = 'Aktif'
# else:
# ret = 'Mati'
# if ret:
# return ret
# else:
# return chain
##########
# String #
##########
def clean(s):
r = ''
for ch in s:
asc = ord(ch)
if asc > 126 or asc < 32:
ch = ' '
r += ch
return r
# def to_str(s):
# s = s or ''
# s = str(s)
# return clean(s)
def left(s, width):
s = to_str(s)
return s.ljust(width)[:width]
def right(s, width):
s = to_str(s)
return s.zfill(width)[:width]
##################
# Data Structure #
##################
class FixLength(object):
def __init__(self, struct):
self.struct = self.fields = None
self.set_struct(struct)
def set_struct(self, struct):
self.struct = struct
self.fields = {}
for s in struct:
name = s[0]
size = s[1:] and s[1] or 1
typ = s[2:] and s[2] or 'A' # N: numeric, A: alphanumeric
self.fields[name] = {'value': None, 'type': typ, 'size': size}
def set(self, name, value):
self.fields[name]['value'] = value
def get(self, name):
v = self.fields[name]['value']
if v:
return v.strip()
return ''
def __setitem__(self, name, value):
self.set(name, value)
def __getitem__(self, name):
return self.get(name)
def get_raw(self):
return self.get_spliter("")
def get_raw_dotted(self):
return self.get_dotted()
def get_value(self, name, typ):
v = self.fields[name]['value']
if v and typ == 'N':
i = int(v)
if v == i:
v = i
return v
def get_spliter(self, ch):
"""
2022-09-12 Perbaikan return jika ch is none
"""
s = ''
for name, size, typ in self.struct:
v = self.get_value(name, typ)
pad_func = typ == 'N' and right or left
s += pad_func(v, size)
s += ch
if ch:
return s[:-1]
return s
def get_dotted(self):
return self.get_spliter('.')
def set_raw(self, raw):
awal = 0
for t in self.struct:
name = t[0]
size = t[1:] and t[1] or 1
akhir = awal + size
value = raw[awal:akhir]
if not value:
return
self.set(name, value)
awal += size
return True
def from_dict(self, d):
for name in self.fields:
if name in d and d[name]:
value = d[name]
self.set(name, value)
def to_dict(self):
r = {}
for name in self.fields:
r[name] = self.get(name)
return r
def length(self):
return len(self.get_raw())
import decimal
class Encoder(JSONEncoder):
"""Extends JSONEncoder for date and decimal types."""
def push_date(self, d):
"""Serialize the given datetime.date object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
return "%04d-%02d-%02d" % (d.year, d.month, d.day)
def push_timedelta(self, t):
"""Serialize the given datetime.timedelta object to a JSON string."""
days = t.days
if days < 0:
minus = "-"
days = -days - 1
seconds = 24 * 60 * 60 - t.seconds
else:
minus = ""
seconds = t.seconds
secs = seconds % 60
seconds /= 60
mins = seconds % 60
hours = seconds / 60
return "%s%d:%02d:%02d:%02d" % (minus, days, hours, mins, secs)
def push_time(self, t):
"""Serialize the given datetime.time object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
return "%02d:%02d:%02d" % (t.hour, t.minute, t.second)
def push_datetime(self, dt):
"""Serialize the given datetime.datetime object to a JSON string."""
# Default is ISO 8601 compatible (standard notation).
# Don't use strftime because that can't handle dates before 1900.
return ("%04d-%02d-%02dT%02d:%02d:%02d" %
(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second))
def default(self, o):
# We MUST check for a datetime.datetime instance before datetime.date.
# datetime.datetime is a subclass of datetime.date, and therefore
# instances of it are also instances of datetime.date.
if isinstance(o, datetime.datetime):
return self.push_datetime(o)
elif isinstance(o, datetime.date):
return self.push_date(o)
elif isinstance(o, datetime.timedelta):
return self.push_timedelta(o)
elif isinstance(o, datetime.time):
return self.push_time(o)
elif isinstance(o, decimal.Decimal):
return str(o)
else:
return JSONEncoder.default(self, o)
STATUS = (
(1, 'Aktif'),
(0, 'Pasif')
)
INVOICE_STATUS = (
(0, 'Draft'),
(1, 'Approved'),
)
def pbb_biller_split():
settings = get_settings()
if not 'pbb_biller' in settings:
return None, None
biller = re.sub('\D', '', settings['pbb_biller'])
return biller[:2], biller[2:]
def clear_null_value(values):
# digunakan untuk menghapus dictionary yang value nya null
tobe_del = []
for value in values:
if not values[value]:
tobe_del.append(value)
for value in tobe_del:
del values[value]
return values
def replace_char(text, start, stop, char="X"):
x = char * (stop - start + 1)
s = text[:start] + x + text[stop:]
# print "*** DEBUG ", text, s
return s
##################
# Email #
##################
# import smtplib
# from email.MIMEMultipart import MIMEMultipart
# from email.MIMEText import MIMEText
# from email.MIMEBase import MIMEBase
# import base64
SCOPES = [
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/userinfo.profile',
# Add other requested scopes.
]
# def send_email(to_addr_list, subject, message, cc_addr_list=None,from_addr=None,
# login=None, password=None, file=None,
# smtp_server='smtp.gmail.com:587'):
# settings = get_settings()
# if settings['devel']=='true':
# return
#
# if not login:
#
# login = 'center.email' in settings and settings['center.email'] or None
# # if login[-9:]=="gmail.com":
# # return send_gmsg(to=to_addr_list, subject=subject, message_text=message, file=file,)
#
# if not password:
# password = 'center.email_password' in settings and settings['center.email_password'] or None
# if not login or not password:
# return 'Error smtp login or Password'
#
# if 'center.smtp_server' in settings and settings['center.smtp_server']:
# smtp_server = settings['center.smtp_server']
#
# if not from_addr:
# from_addr = login
#
# header = 'From: %s\r\n' % from_addr
# header += 'To: %s \r\n' % ','.join(to_addr_list)
# if cc_addr_list:
# header += 'Cc: %s \r\n' % ','.join(cc_addr_list)
# header += 'Subject: %s \r\n' % subject
# message = header + message
# smtp, port = smtp_server.split(':')
# port = int(port)
#
# #print '>>>>>>>>>>>>', smtp, port, login,password
# try:
# #print smtp_server, smtp, port
# server = smtplib.SMTP(smtp,port) #smtp_server)
# server.ehlo()
# if port==587:
# server.starttls()
#
# server.login(login,password)
# problems = server.sendmail(from_addr, to_addr_list, message)
# server.quit()
# return
# except Exception as e:
# return str(e)
#
# def create_message(sender, to, subject, message_text, file=None):
# """Create a message for an email.
#
# Args:
# sender: Email address of the sender.
# to: Email address of the receiver.
# subject: The subject of the email message.
# message_text: The text of the email message.
# file: The path to the file to be attached.
#
# Returns:
# An object containing a base64url encoded email object.
# """
# if file:
# message = MIMEMultipart()
# msg = MIMEText(message_text)
# else:
# message = MIMEText(message_text)
#
# message['to'] = to
# message['from'] = sender
# message['subject'] = subject
#
# if file:
# message.attach(msg)
# content_type, encoding = mimetypes.guess_type(file)
#
# if content_type is None or encoding is not None:
# content_type = 'application/octet-stream'
# main_type, sub_type = content_type.split('/', 1)
# if main_type == 'text':
# fp = open(file, 'rb')
# msg = MIMEText(fp.read(), _subtype=sub_type)
# fp.close()
# elif main_type == 'image':
# fp = open(file, 'rb')
# msg = MIMEImage(fp.read(), _subtype=sub_type)
# fp.close()
# elif main_type == 'audio':
# fp = open(file, 'rb')
# msg = MIMEAudio(fp.read(), _subtype=sub_type)
# fp.close()
# else:
# fp = open(file, 'rb')
# msg = MIMEBase(main_type, sub_type)
# msg.set_payload(fp.read())
# fp.close()
# filename = os.path.basename(file)
# msg.add_header('Content-Disposition', 'attachment', filename=filename)
# message.attach(msg)
#
# return {'raw': base64.urlsafe_b64encode(message.as_string())}
#
#
#
# def get_service():
# from googleapiclient.discovery import build
# from httplib2 import Http
# from oauth2client import file, client, tools
# """Shows basic usage of the Gmail API.
# Lists the user's Gmail labels.
# """
# # The file token.json stores the user's access and refresh tokens, and is
# # created automatically when the authorization flow completes for the first
# # time.
# store = file.Storage('token.json')
# creds = store.get()
# if not creds or creds.invalid:
# flow = client.flow_from_clientsecrets('./credentials.json', SCOPES)
# creds = tools.run_flow(flow, store)
# return build('gmail', 'v1', http=creds.authorize(Http()))
#
# def send_gmsg(to, subject, message_text, file=None):
# settings = get_settings()
# sender = 'center.email' in settings and settings['center.email'] or None
#
# message = create_message(sender, to, subject, message_text, file=None)
# service = get_service()
# try:
# message = (service.users().messages().send(userId=user_id, body=message)
# .execute())
# return message
# except errors.HttpError as error:
# pass
#
# http://python.web.id/angka-terbilang-pada-python/#.VQpS8s2sXQo
satuan = ['', 'satu', 'dua', 'tiga', 'empat', 'lima', 'enam', 'tujuh',
'delapan', 'sembilan', 'sepuluh', 'sebelas']
def terbilang_(n):
n = int(n)
if n >= 1000000000:
hasil = terbilang_(n / 1000000000) + ['milyar'] + terbilang_(
n % 100000000)
elif n > 1000000: #commit by taufik
hasil = terbilang_(n / 1000000) + ['juta'] + terbilang_(n % 1000000)
elif n >= 2000:
hasil = terbilang_(n / 1000) + ['ribu'] + terbilang_(n % 1000)
elif n >= 1000:
hasil = ['seribu'] + terbilang_(n - 1000)
elif n >= 200:
hasil = terbilang_(n / 100) + ['ratus'] + terbilang_(n % 100)
elif n >= 100:
hasil = ['seratus'] + terbilang_(n - 100)
elif n >= 20:
hasil = terbilang_(n / 10) + ['puluh'] + terbilang_(n % 10)
elif n >= 12:
hasil = terbilang_(n % 10) + ['belas']
else:
hasil = [satuan[n]]
print('hasil')
return hasil
def terbilang(n):
if n == 0:
return 'nol'
t = terbilang_(n)
while '' in t:
t.remove('')
return ' '.join(t)
def number_only(value):
return re.sub('\D', "", value)
def set_user_log(message, request, logobj=None, user_name=None):
if not logobj:
logobj = log
if not request:
logobj.info("Request not Set")
return
if not user_name:
user_name = request and request.user and request.user.user_name or None
if not user_name:
logobj.info("User Name not Set")
return
addr = request.client_addr
message = "User {} at Addr {}: {}".format(user_name, addr, message)
logobj.warning(message)
def create_static_path(config, folder, url, cache_max_age=3600):
log.debug("Folder to create: %s", folder)
if not os.path.exists(folder):
try:
os.makedirs(folder)
except Exception as e:
log.debug("User: %s", os.path.expanduser("~"))
log.debug("Error: %s", str(e))
config.add_static_view(url, path=folder, cache_max_age=cache_max_age)
def includeme(config):
config.add_translation_dirs('opensipkd.tools:locale/')
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!