Commit dee2b132 by Owo Sugiana

Kali pertama

0 parents
*.pem
*.egg-info
build
__pycache__
Winpay Client
=============
Ini adalah kumpulan modul untuk mengakses `Winpay <https://docs.winpay.id/>`_.
Create Virtual Account
----------------------
Buatlah private key::
$ openssl genrsa -out private.pem 2048
$ openssl rsa -in private.pem -pubout -out public.pem
Sampaikan ``public.pem`` ke Winpay. Lalu mintalah Partner ID. Kemudian jalankan::
$ ~/env/bin/winpay_create_va --private-file=private.pem --partner-id=WINPAY-PARTNER-ID --customer-no=08123456789 --va-name="IWAN AGRATEK" --amount=10000 --channel=BNI
Hasilnya::
{'responseCode': '2002700',
'responseMessage': 'Success',
'virtualAccountData': {'additionalInfo': {'channel': 'BNI',
'contractId': 'bi93661e5c-c479-4ee9-b8ac-8ac7a750e6e7'},
'customerNo': '4410066483',
'expiredDate': '2025-03-11T18:25:35+07:00',
'partnerServiceId': ' 988332',
'totalAmount': {'currency': 'IDR', 'value': '10000.00'},
'trxId': '182535',
'virtualAccountName': 'IWAN AGRATEK',
'virtualAccountNo': ' 9883324410066483',
'virtualAccountTrxType': 'c'}}
[build-system]
requires = ['setuptools >= 64']
[project]
name = 'winpay-client'
version = '0.1'
dependencies = [
'cryptography',
'requests',
'opensipkd-hitung @ git+https://git.opensipkd.com/sugiana/opensipkd-hitung',
]
requires-python = '>= 3.9'
authors = [
{name='Owo Sugiana', email='sugiana@gmail.com'},
]
description = 'Winpay Client'
readme = 'README.rst'
classifiers = [
'Programming Language :: Python :: 3',
'Operating System :: OS Independent',
]
[project.scripts]
ssl_encrypt = 'winpay.scripts.encrypt:encrypt'
winpay_signature = 'winpay.scripts.encrypt:signature'
winpay_create_va = 'winpay.scripts.encrypt:create_va'
from setuptools import (
setup,
find_packages,
)
setup(packages=find_packages())
File mode changed
import sys
from argparse import ArgumentParser
from base64 import b64encode
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import (
hashes,
serialization,
)
from cryptography.hazmat.primitives.asymmetric import padding
backend = default_backend()
hash_sha256 = hashes.SHA256()
def sign(private_key: bytes, data: bytes) -> bytes:
key = serialization.load_pem_private_key(
private_key, password=None, backend=backend)
mgf = padding.MGF1(hash_sha256)
padding_obj = padding.PSS(mgf=mgf, salt_length=padding.PSS.MAX_LENGTH)
return key.sign(data, padding_obj, hash_sha256)
def verify(public_key: bytes, data_signed: bytes, data: bytes):
key = serialization.load_pem_public_key(public_key, backend=backend)
mgf = padding.MGF1(hash_sha256)
padding_obj = padding.PSS(mgf=mgf, salt_length=padding.PSS.MAX_LENGTH)
key.verify(data_signed, data, padding_obj, hash_sha256)
def sign_without_salt(private_key: bytes, data: bytes) -> bytes:
key = serialization.load_pem_private_key(
private_key, password=None, backend=backend)
return key.sign(data, padding.PKCS1v15(), hash_sha256)
def verify_without_salt(public_key: bytes, data_signed: bytes, data: bytes):
key = serialization.load_pem_public_key(public_key, backend=backend)
key.verify(data_signed, data, padding.PKCS1v15(), hash_sha256)
def main(argv=sys.argv[1:]):
pars = ArgumentParser()
pars.add_argument('--private-file', required=True)
pars.add_argument('--data-file', required=True)
pars.add_argument('--without-salt', action='store_true')
pars.add_argument('--public-file')
option = pars.parse_args(argv)
with open(option.private_file, 'rb') as f:
private_key = f.read()
with open(option.data_file, 'rb') as f:
data = f.read()
sign_func = option.without_salt and sign_without_salt or sign
data_signed = sign_func(private_key, data)
data_b64 = b64encode(data_signed).decode('utf-8')
print(data_b64)
if not option.public_file:
return
with open(option.public_file, 'rb') as f:
public_key = f.read()
verify_func = option.without_salt and verify_without_salt or verify
verify_func(public_key, data_signed, data)
if __name__ == '__main__':
main()
import sys
import json
from datetime import datetime
from argparse import ArgumentParser
from base64 import b64encode
from pprint import pprint
from opensipkd.waktu import create_now
from winpay.encrypt import (
sign,
sign_without_salt,
verify,
verify_without_salt,
)
from winpay.signature import generator
from winpay.va import (
create,
HttpErr,
)
def encrypt(argv=sys.argv[1:]):
pars = ArgumentParser()
pars.add_argument('--private-file', required=True)
pars.add_argument('--data-file', required=True)
pars.add_argument('--without-salt', action='store_true')
pars.add_argument('--public-file')
option = pars.parse_args(argv)
with open(option.private_file, 'rb') as f:
private_key = f.read()
with open(option.data_file, 'rb') as f:
data = f.read()
sign_func = option.without_salt and sign_without_salt or sign
data_signed = sign_func(private_key, data)
data_b64 = b64encode(data_signed).decode('utf-8')
print(data_b64)
if not option.public_file:
return
with open(option.public_file, 'rb') as f:
public_key = f.read()
verify_func = option.without_salt and verify_without_salt or verify
verify_func(public_key, data_signed, data)
def signature(argv=sys.argv[1:]):
pars = ArgumentParser()
pars.add_argument('--private-file', required=True)
pars.add_argument('--json-file', required=True)
option = pars.parse_args(sys.argv[1:])
with open(option.private_file, 'rb') as f:
private_bytes = f.read()
with open(option.json_file) as f:
data = json.loads(f.read())
timestamp = create_now()
sign_bytes = generator(private_bytes, data, timestamp)
sign_b64 = b64encode(sign_bytes).decode('utf-8')
print(sign_b64)
def create_va(argv=sys.argv[1:]):
url = 'https://sandbox-api.bmstaging.id/snap/v1.0/transfer-va/create-va'
help_url = f'default {url}'
expired_days = 1
help_expired = f'default {expired_days}'
channels = [
'BRI', 'BNI', 'MANDIRI', 'PERMATA', 'BSI', 'MUAMALAT', 'BCA', 'CIMB',
'SINARMAS', 'BNC', 'INDOMARET', 'ALFAMART']
pars = ArgumentParser()
pars.add_argument('--private-file', required=True)
pars.add_argument('--partner-id', required=True)
pars.add_argument('--customer-no', required=True)
pars.add_argument('--va-name', required=True)
pars.add_argument('--amount', type=int, required=True)
pars.add_argument('--channel', choices=channels)
pars.add_argument('--url', default=url, help=help_url)
pars.add_argument(
'--expired-days', type=int, default=expired_days, help=help_expired)
pars.add_argument('--trx-id')
option = pars.parse_args(sys.argv[1:])
with open(option.private_file, 'rb') as f:
private_key = f.read()
if option.trx_id:
trx_id = option.trx_id
else:
trx_id = datetime.now().strftime('%H%M%S')
try:
r = create(
option.url, private_key, option.partner_id, option.customer_no,
option.va_name, option.amount, option.channel, trx_id,
option.expired_days)
pprint(r)
except HttpErr as e:
print('ERROR', e)
# Winpay Signature Generator
import hashlib
import json
from .encrypt import sign_without_salt
def time_to_str(t: 'datetime with time zone'):
s = t.strftime('%Y-%m-%dT%H:%M:%S%z')
return s[:-2] + ':' + s[-2:]
def generator(
private_key: bytes, data: dict, timestamp: 'datetime with time zone',
http_method='POST',
end_point_url='/v1.0/transfer-va/create-va') -> bytes:
body = json.dumps(data, separators=(',', ':'))
body_byte = body.encode('utf-8')
body_hash = hashlib.sha256(body_byte).hexdigest().lower()
timestamp_str = time_to_str(timestamp)
sign_list = [http_method, end_point_url, body_hash, timestamp_str]
sign_str = ':'.join(sign_list)
return sign_without_salt(private_key, sign_str.encode('utf-8'))
# Winpay Create Virtual Account
from base64 import b64encode
from datetime import timedelta
from logging import getLogger
import requests
from opensipkd.waktu import create_now
from .signature import (
generator,
time_to_str,
)
class HttpErr(Exception):
pass
def create(
url: str, private_key: bytes, partner_id: str, customer_no: str,
va_name: str, amount: int, channel: str, trx_id: str, expired_days=1):
log = getLogger('create_va()')
timestamp = create_now()
timestamp_str = time_to_str(timestamp)
expired_timestamp = timestamp + timedelta(expired_days)
expired_str = time_to_str(expired_timestamp)
data = dict(
customerNo=customer_no,
virtualAccountName=va_name,
trxId=trx_id,
totalAmount=dict(value=amount, currency='IDR'),
virtualAccountTrxType='c',
expiredDate=expired_str,
additionalInfo=dict(channel=channel))
signature = generator(private_key, data, timestamp)
signature_b64 = b64encode(signature).decode('utf-8')
headers = {
'X-TIMESTAMP': timestamp_str,
'X-SIGNATURE': signature_b64,
'X-PARTNER-ID': partner_id,
'X-EXTERNAL-ID': trx_id,
'CHANNEL-ID': channel}
log.info(f'Request: Headers {headers}, Data {data}')
r = requests.post(url, headers=headers, json=data)
if r.status_code != 200:
log.error(f'HTTP {r.status_code}, {[r.text]}')
raise HttpErr(r.status_code, r.text)
d = r.json()
log.info(f'Response: {d}')
return d
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!