Commit 5248d374 by Owo Sugiana

Kali pertama

0 parents
*.pem
*.egg-info
*.vscode
build
__pycache__
SNAP di Pyramid
===============
Ini adalah landasan untuk penerapan `SNAP <https://apidevportal.aspi-indonesia.or.id/>`_
di web framework `Pyramid <https://trypyramid.com>`_.
Kita bisa memasangnya langsung dengan cara::
$ ~/env/bin/pip install git+https://git.opensipkd.com/sugiana/pyramid-snap
Jika Anda ingin mengembangkannya maka unduh terlebih dahulu::
$ git clone https://git.opensipkd.com/sugiana/pyramid-snap
$ cd pyramid-snap
$ ~/env/bin/pip install -e .[dev]
Lakukan uji coba::
$ cd tests
$ ~/env/bin/pytest
Server
------
Merujuk pada dokumentasi `Standar Keamanan SNAP <https://apidevportal.aspi-indonesia.or.id/api-services/keamanan>`_
maka client harus membuat *private key* dan *public key*. Private key digunakan
client untuk meng-encrypt signature::
$ openssl genrsa -out private.pem 2048
Public key digunakan server untuk memvalidasi signature::
$ openssl rsa -in private.pem -pubout -out public.pem
Di paket ini kedua file tersebut sudah disediakan. Jalankan server uji
coba::
$ cd pyramid-snap/example/server
$ ~/env/bin/pserve development.ini
Nanti dia listen di port 6543. Selanjutnya buka terminal lain untuk
menjalankan client::
$ cd pyramid-snap/example/client
$ ~/env/bin/python create_va --private-file=../server/private.pem
Web server tadi adalah simulator Winpay server. Jadi kita juga bisa mengujinya
dengan `Winpay Client <https://git.opensipkd.com/sugiana/winpay-client/>`_::
$ ~/env/bin/winpay_create_va --private-file=../server/private.pem \
--partner-id=1234 --customer-no=08123456789 --va-name="Iwan Gunawan" \
--amount=10000 --channel=MANDIRI \
--url=http://localhost:6543/v1.0/transfer-va/create-va
Semoga dipahami.
\ No newline at end of file
import sys
import json
from datetime import (
datetime,
timedelta,
)
from argparse import ArgumentParser
from base64 import b64encode
import requests
import pytz
from winpay.signature import generator
def time_to_str(t: datetime) -> str:
s = t.strftime('%Y-%m-%dT%H:%M:%S%z')
return s[:-2] + ':' + s[-2:]
url = 'http://localhost:6543/v1.0/transfer-va/create-va'
help_url = f'default {url}'
trx_id = datetime.now().strftime('%H%M%S')
help_trx_id = f'default {trx_id}'
partner_id = '1234'
help_partner_id = f'default {partner_id}'
customer_no = '081234567890'
help_customer_no = f'default {customer_no}'
va_name = 'Jokul Doe'
help_va_name = f'default {va_name}'
amount = 10000
help_amount = f'default {amount}'
expired_days = 1
help_expired = f'default {expired_days}'
channels = [
'BRI', 'BNI', 'MANDIRI', 'PERMATA', 'BSI', 'MUAMALAT', 'BCA', 'CIMB',
'SINARMAS', 'BNC', 'INDOMARET', 'ALFAMART']
channel = channels[0]
help_channel = f'default {channel}'
pars = ArgumentParser()
pars.add_argument('--private-file', required=True)
pars.add_argument('--partner-id', default=partner_id, help=help_partner_id)
pars.add_argument('--customer-no', default=customer_no, help=help_customer_no)
pars.add_argument('--va-name', default=va_name, help=help_va_name)
pars.add_argument('--amount', type=int, default=amount, help=help_amount)
pars.add_argument(
'--channel', choices=channels, default=channel, help=help_channel)
pars.add_argument('--url', default=url, help=help_url)
pars.add_argument(
'--expired-days', type=int, default=expired_days, help=help_expired)
pars.add_argument('--trx-id', default=trx_id, help=help_trx_id)
option = pars.parse_args(sys.argv[1:])
one_day = timedelta(1)
jakarta_tz = pytz.timezone('Asia/Jakarta')
now = datetime.now(jakarta_tz)
now_str = time_to_str(now)
expired_date = now + one_day * option.expired_days
expired_str = time_to_str(expired_date)
data = dict(
virtualAccountTrxType='C',
customerNo=option.customer_no,
virtualAccountName=option.va_name,
trxId=option.trx_id,
totalAmount=dict(value=option.amount, currency='IDR'),
expiredDate=expired_str,
additionalInfo=dict(channel=option.channel))
with open(option.private_file, 'rb') as f:
private_bytes = f.read()
signature = generator(private_bytes, data, now)
signature_b64 = b64encode(signature)
signature_b64 = signature_b64.decode('utf-8')
headers = {
'X-TIMESTAMP': now_str,
'X-SIGNATURE': signature_b64,
'X-PARTNER-ID': option.partner_id,
}
print('Request')
print('URL:', url)
print('Headers:', headers)
print('Body:', data)
response = requests.post(url, headers=headers, json=data)
print()
print('Response')
print('HTTP Status:', response.status_code)
print('Headers:', response.headers)
try:
r = response.json()
except json.decoder.JSONDecodeError:
r = response.text
print('Body:', r)
[app:main]
use = egg:snap-server
timezone = Asia/Jakarta
# Di production isi public.pem disimpan di database
# Susunan: user_id,partner_id,public_key_file
users =
1,1234,public.pem
# Ini bagian dari path
snap_version = v1.0
[server:main]
use = egg:waitress#main
listen = 0.0.0.0:6543
threads = 12
[loggers]
keys = root
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = DEBUG
handlers = console
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)-5.5s [%(name)s:%(lineno)s][%(threadName)s] %(message)s
[project]
name = 'snap-server'
version = '1.0'
dependencies = [
'pyramid',
'waitress',
]
[tool.setuptools.packages.find]
include = ['snap_server']
[project.entry-points."paste.app_factory"]
main = 'snap_server:main'
\ No newline at end of file
import os
from base64 import b64decode
from pyramid.config import Configurator
from pyramid.authentication import IAuthenticationPolicy
from pyramid.authorization import ACLAuthorizationPolicy
from zope.interface import implementer
from winpay.encrypt import verify_without_salt
from winpay.signature import create_data
def create_sign_data(request):
return create_data(
request.json_body, request.headers.get('X-Timestamp'),
request.method, request.path)
@implementer(IAuthenticationPolicy)
class AuthenticationPolicy:
def authenticated_userid(self, request):
partner_id = request.headers.get('X-Partner-Id')
settings = request.registry.settings
if partner_id not in settings['public_keys']:
return None
user_id, public_key = settings['public_keys'][partner_id]
signature_b64 = request.headers.get('X-Signature')
signature_b64 = signature_b64.encode('utf-8')
signature = b64decode(signature_b64)
sign_data = create_sign_data(request)
verify_without_salt(public_key, signature, sign_data)
return user_id
# Di production public key disimpan di database
def user_public_keys(settings):
base_dir, _ = os.path.split(settings['config_file'])
d = {}
for user in settings['users'].strip().splitlines():
user_id, partner_id, public_file = user.split(',')
if not os.path.exists(public_file):
public_file = os.path.join(base_dir, public_file)
with open(public_file, 'rb') as f:
public_bytes = f.read()
d[partner_id] = (user_id, public_bytes)
return d
def main(global_config, **settings):
settings['config_file'] = global_config['__file__']
settings['public_keys'] = user_public_keys(settings)
with Configurator(settings=settings) as config:
authn_policy = AuthenticationPolicy()
authz_policy = ACLAuthorizationPolicy()
config.set_authentication_policy(authn_policy)
config.set_authorization_policy(authz_policy)
config.include('pyramid_snap')
config.scan('.views')
return config.make_wsgi_app()
import random
import json
import traceback
from io import StringIO
from logging import getLogger
from pyramid.view import view_config
from pyramid.response import Response
from cryptography.exceptions import InvalidSignature
from pyramid_snap.view import Base
from pyramid_snap.structure import (
SERVICE_VA_CREATE,
RC_UNKNOWN_ERROR,
VA_FIELDS,
)
from pyramid_snap.decorator import (
json_method,
create_response,
)
from pyramid_snap.validation import BaseError
def exception_message():
f = StringIO()
traceback.print_exc(file=f)
s = f.getvalue()
f.close()
return s
@view_config(context=Exception)
def view_exception(exc, request):
log = getLogger('view_exception')
log.error(exception_message())
if isinstance(exc, BaseError):
msg = exc[0]
elif isinstance(exc, InvalidSignature):
msg = 'Signature tidak valid'
else:
msg = 'Ada kesalahan yang belum dipahami'
resp = create_response('00', RC_UNKNOWN_ERROR, msg)
try:
d = resp.json
except json.decoder.JSONDecodeError:
d = [resp.text] # Agar tetap 1 baris
log.error(
'to %s headers %s response %s', request.client_addr,
dict(resp.headers), d)
return resp
def nomor_acak(jml: int) -> str:
return '08899' + ''.join(
str(random.randint(0, 9)) for _ in range(jml))
class API(Base):
@json_method(
route_name='create-va', service_code=SERVICE_VA_CREATE,
fields=VA_FIELDS)
def create_va(self, params: dict) -> dict:
vacc = nomor_acak(10)
vacc_data = params.copy()
vacc_data['virtualAccountNo'] = vacc
return dict(virtualAccountData=vacc_data)
from pyramid import testing
def test_create_va():
import os
from datetime import timedelta
from base64 import b64encode
from opensipkd.waktu import create_now
from pyramid_snap.structure import (
RC_OK,
SERVICE_VA_CREATE,
)
from pyramid_snap.view import (
time_to_str,
create_rc,
)
from winpay.signature import generator
from snap_server.views import API
private_file = os.path.join('..', 'private.pem')
with open(private_file, 'rb') as f:
private_key = f.read()
timestamp = create_now()
expired_timestamp = timestamp + timedelta(1)
timestamp_str = time_to_str(timestamp)
expired_str = time_to_str(expired_timestamp)
trx_id = '122005'
channel = 'MANDIRI'
data = {
'customerNo': '08123456789',
'virtualAccountName': 'Iwan Gunawan',
'trxId': trx_id,
'totalAmount': {
'value': 10000,
'currency': 'IDR'},
'virtualAccountTrxType': 'c',
'expiredDate': expired_str,
'additionalInfo': {'channel': channel}}
signature = generator(private_key, data, timestamp)
signature_b64 = b64encode(signature)
signature_b64 = signature_b64.decode('utf-8')
headers = {
'X-Timestamp': timestamp_str,
'X-Signature': signature_b64,
'X-Partner-Id': '1234',
'X-External-Id': trx_id,
'Channel-Id': channel}
request = testing.DummyRequest(
client_addr='127.0.0.1', method='POST', json_body=data,
headers=headers)
api = API(request)
resp = api.create_va()
assert resp.status_code == RC_OK[0]
assert resp.json_body['responseCode'] == create_rc(
RC_OK, SERVICE_VA_CREATE)
[project]
name = 'pyramid-snap'
version = '1.0'
dependencies = [
'pyramid',
'pytz',
]
requires-python = '>= 3.9'
authors = [
{name='Owo Sugiana', email='sugiana@gmail.com'},
]
description = 'Standar Nasional Open API Pembayaran'
readme = 'README.rst'
license = {text='PostgreSQL License'}
classifiers = [
'Programming Language :: Python',
'Framework :: Pylons',
'Topic :: Internet :: WWW/HTTP :: WSGI :: Application',
]
[tool.setuptools.packages.find]
include = ['pyramid_snap']
[project.optional-dependencies]
dev = [
'pytest',
'requests',
'winpay-client @ git+https://git.opensipkd.com/sugiana/winpay-client',
]
VERSION = '1.0'
def includeme(config):
version = config.registry.settings.get('snap_version', VERSION)
path = f'/{version}/transfer-va/create-va'
# https://apidevportal.aspi-indonesia.or.id/api-services/transfer-kredit/virtual-account
config.add_route('create-va', path)
from functools import wraps
from logging import getLogger
import venusian
from .structure import (
HEADER_FIELDS,
RC_OK,
)
from .validation import (
field_validation,
header_validation,
)
from .view import create_response
# View decorator
class json_method:
def __init__(self, **kw):
self.kw = kw
def __call__(self, func):
kw = self.kw.copy()
# add_view() tidak bisa menerima parameter berikut ini
depth = kw.pop('_depth', 0)
service_code = kw.pop('service_code', None)
fields = kw.pop('fields', None)
def callback(context, name, ob):
config = context.config.with_package(info.module)
config.add_view(
view=ob, renderer='json', request_method='POST', **kw)
info = venusian.attach(
func, callback, category='pyramid', depth=depth + 1)
if info.scope == 'class':
# ensure that attr is set if decorating a class method
kw.setdefault('attr', func.__name__)
kw['_info'] = info.codeinfo # fbo action_method
@wraps(func)
def wrapper(cls):
request = cls.request
log = getLogger(func.__name__)
log.info(
'from %s headers %s request %s', request.client_addr,
dict(request.headers), request.json_body)
if request.authenticated_userid:
log.info('user_id %s', request.authenticated_userid)
header_validation(request.headers, HEADER_FIELDS)
params = field_validation(request.json_body, fields)
r = func(cls, params)
resp = create_response(service_code, RC_OK, 'Berhasil', r, request)
log.info(
'to %s headers %s response %s', request.client_addr,
dict(resp.headers), r)
return resp
return wrapper
HTTP_OK = 200
HTTP_BAD_REQUEST = 400
HTTP_SYSTEM_ERROR = 500
# Case Codes
RC_OK = (HTTP_OK, '00')
RC_MISSING_FIELD = (HTTP_BAD_REQUEST, '02')
RC_UNKNOWN_ERROR = (HTTP_SYSTEM_ERROR, '01')
# Service Codes
# https://apidevportal.aspi-indonesia.or.id/api-services/transfer-kredit/virtual-account
SERVICE_VA_CREATE = '27'
# https://apidevportal.aspi-indonesia.or.id/api-services/transfer-kredit/account-inquiry
SERVICE_VA_INQUIRY = '24'
SERVICE_VA_PAYMENT = '25'
# Fields
# Virtual Account
VA_FIELDS = dict(
virtualAccountTrxType='C',
customerNo='1234567890',
virtualAccountName='Jokul Doe',
trxId='abcd1234',
totalAmount=dict(value=12345678, currency='IDR'),
expiredDate='2020-12-31T23:59:59+07:00',
additionalInfo=dict(channel='MANDIRI'))
HEADER_FIELDS = [
'X-Timestamp',
'X-Signature',
'X-Partner-Id',
]
class BaseError(Exception):
pass
class HeaderRequiredError(BaseError):
pass
class FieldRequiredError(BaseError):
pass
def field_validation(d: dict, ref: dict):
r = dict()
for field, ref_value in ref.items():
if field not in d:
msg = f'Field {field} wajib ada'
raise FieldRequiredError(msg)
client_value = d[field]
if isinstance(ref_value, dict):
client_value = field_validation(client_value, ref_value)
r[field] = client_value
return r
def header_validation(headers: dict, ref: list):
for field in ref:
if field not in headers:
msg = f'Header {field} wajib ada'
raise HeaderRequiredError(msg)
from datetime import datetime
import pytz
from pyramid.response import Response
def time_to_str(t: datetime) -> str:
s = t.strftime('%Y-%m-%dT%H:%M:%S%z')
return s[:-2] + ':' + s[-2:]
def str_to_time(s: str) -> datetime:
t = s.split(':')
s = ':'.join(t[:-1]) + t[-1]
return datetime.strptime(s, '%Y-%m-%dT%H:%M:%S%z')
def create_now(timezone: str) -> datetime:
tz = pytz.timezone(timezone)
return datetime.now(tz)
def create_rc(rc: tuple, service_code: str):
http_status, case_code = rc
return f'{http_status}{service_code}{case_code}'
def create_response(
service_code: str, rc: tuple, msg='', data={}, headers={},
request=None) -> dict:
settings = request and request.registry.settings
timezone = settings and settings.get('timezone') or 'Asia/Jakarta'
now = create_now(timezone)
headers = {
'Content-Type': 'application/json',
'X-Timestamp': time_to_str(now)}
r = data.copy()
r.update(dict(
responseCode=create_rc(rc, service_code),
responseMessage=msg))
# https://docs.pylonsproject.org/projects/pyramid_cookbook/en/latest/pylons/exceptions.html
return Response(headers=headers, json=r, status=rc[0])
class Base:
def __init__(self, request):
self.request = request
import pytest
def test_ok():
from pyramid_snap.structure import VA_FIELDS
from pyramid_snap.validation import field_validation
d = VA_FIELDS.copy()
d.update(dict(customerNo='1234'))
r = field_validation(d, VA_FIELDS)
assert d == r
def test_kurang_field():
from pyramid_snap.structure import VA_FIELDS
from pyramid_snap.validation import (
FieldRequiredError,
field_validation,
)
fields = list(VA_FIELDS)
d = {fields[1]: '5678'}
with pytest.raises(FieldRequiredError) as info:
field_validation(d, VA_FIELDS)
field = fields[0]
msg = f'Field {field} wajib ada'
assert str(info.value) == msg
def test_kurang_header():
from pyramid_snap.structure import HEADER_FIELDS
from pyramid_snap.validation import (
HeaderRequiredError,
header_validation,
)
fields = list(HEADER_FIELDS)
d = {fields[1]: '1234'}
with pytest.raises(HeaderRequiredError) as info:
header_validation(d, HEADER_FIELDS)
field = fields[0]
msg = f'Header {field} wajib ada'
assert str(info.value) == msg
def test_time_to_str():
from pyramid_snap.view import time_to_str
from datetime import datetime
import pytz
timezone = 'Asia/Jakarta'
tz = pytz.timezone(timezone)
t = datetime(2025, 3, 16, 8, 37, 27, tzinfo=tz)
assert time_to_str(t) == '2025-03-16T08:37:27+07:07'
def test_str_to_time():
from pyramid_snap.view import str_to_time
from datetime import datetime
import pytz
timezone = 'Asia/Jakarta'
tz = pytz.timezone(timezone)
t = datetime(2025, 3, 16, 8, 37, 27, tzinfo=tz)
s = '2025-03-16T08:37:27+07:07'
assert str_to_time(s) == t
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!