Commit 6c90ccc2 by Owo Sugiana

Kali pertama

0 parents
test-*
*egg-info
__pycache__
0.1 2023-03-07
--------------
- Kali pertama
WebSocket untuk JSON
====================
Ini adalah WebSocket server dengan JSON sebagai bentuk datanya. Dirancang
fleksibel untuk berbagai spesifikasi JSON. Dia juga memiliki web service biasa
untuk mengirim pesan ke WebSocket client melalui JSON-RPC.
Ini membutuhkan Python versi 3.11 ke atas.
Pemasangan::
$ python3.11 -m venv ~/env
$ ~/env/bin/pip install --upgrade pip wheel
$ cd async-web
$ ~/env/bin/pip install -e .
Jalankan server::
$ ~/env/bin/async_server server.ini
Jalankan client::
$ ~/env/bin/async_client client.ini
Setelah terhubung cobalah mengirim pesan melalui JSON-RPC::
$ ~/env/bin/python contrib/json-rpc-client.py
Selamat mencoba.
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
import sys
import asyncio
import json
from configparser import ConfigParser
import websockets
from .read_conf import get_module_object
async def main_loop(conf, module):
async with websockets.connect(conf['url']) as websocket:
first = True
while True:
await asyncio.sleep(1)
if first:
first = False
d = module.login()
else:
d = module.get_data()
if d:
raw = json.dumps(d)
await websocket.send(raw)
print(f'Send {[raw]}')
else:
try:
async with asyncio.timeout(2):
raw = await websocket.recv()
print(f'Receive {[raw]}')
except asyncio.TimeoutError:
pass
async def main_(conf):
module = get_module_object(conf['module'])
module.init(conf)
await main_loop(conf, module)
def main(argv=sys.argv[1:]):
conf_file = argv[0]
conf = ConfigParser()
conf.read(conf_file)
cf = dict(conf.items('main'))
obj = main_(cf)
asyncio.run(obj, debug=True)
from io import StringIO
import traceback
def exception_message():
with StringIO() as f:
traceback.print_exc(file=f)
return f.getvalue()
class BaseError(Exception):
def __init__(self, code, message):
self.code = code
self.message = message
from datetime import datetime
from time import time
import websockets
registry = dict()
def update_connection_time():
registry['connection_time'] = time()
def echo():
s = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return dict(action='echo', time=s)
def login():
update_connection_time()
conf = registry['conf']
return dict(action='login', key=conf['key'])
def get_data():
if time() - registry['connection_time'] > 30:
update_connection_time()
return echo()
def parse(d: dict):
update_connection_time()
def init(conf):
registry['conf'] = conf
from async_web.exceptions import BaseError
from async_web.read_conf import str2dict
keys = dict()
ERR_KEY = 91
ERR_ACTION = 76
ERR_OTHER = 76
class ErrKey(BaseError):
def __init__(self, message):
super().__init__(ERR_KEY, message)
class ErrAction(BaseError):
def __init__(self, message):
super().__init__(ERR_ACTION, message)
def ack(code=0, msg='OK', result=dict()):
r = dict(result)
r['code'] = code
r['message'] = msg
return r
def ack_from_exception(e: BaseError):
return ack(e.code, e.message)
def ack_other(msg: str):
return ack(ERR_OTHER, message=msg)
def login(d: dict):
if 'key' not in d:
raise ErrKey('Field key tidak ditemukan')
if d['key'] not in keys:
raise ErrKey('key tidak terdaftar')
client_id = keys[d['key']]
return dict(action='login', client_id=client_id, code=0, message='OK')
def parse(d: dict):
action = d.get('action')
if not action:
raise ErrAction('Field action tidak ditemukan')
if action == 'echo':
return ack()
msg = f'ERROR action {action} tidak dipahami'
raise ErrAction(msg)
def init(conf: dict):
d = str2dict(conf['keys'])
keys.update(d)
from configparser import RawConfigParser
import logging
import logging.config
def setup_logging(conf_file):
conf = RawConfigParser()
conf.read(conf_file)
d = {
'version': 1,
'formatters': {},
'handlers': {},
'loggers': {},
}
for section in conf.sections():
if section.find('formatter_') == 0:
name = section.split('formatter_')[1]
data = {'format': conf.get(section, 'format')}
d['formatters'][name] = data
elif section.find('handler_') == 0:
name = section.split('handler_')[1]
data = {'formatter': conf.get(section, 'formatter')}
data['class'] = 'logging.' + conf.get(section, 'class')
if conf.has_option(section, 'stream'):
data['stream'] = 'ext://' + conf.get(section, 'stream')
else:
data['filename'] = conf.get(section, 'filename')
d['handlers'][name] = data
elif section.find('logger_') == 0:
name = section.split('logger_')[1]
if name == 'root':
name = ''
data = {'level': conf.get(section, 'level')}
value = conf.get(section, 'handlers')
data['handlers'] = [x.strip() for x in value.split(',')]
d['loggers'][name] = data
logging.config.dictConfig(d)
def get_module_object(name):
module_obj = __import__(name)
sub_obj = None
for sub in name.split('.')[1:]:
if sub_obj:
sub_obj = getattr(sub_obj, sub)
else:
sub_obj = getattr(module_obj, sub)
return sub_obj
def str2dict(s):
r = dict()
for token in s.split():
key, value = token.split(':')
r[key] = value
return r
import sys
import asyncio
import json
from logging import getLogger
from configparser import ConfigParser
import uvicorn
import websockets
from asgiref.wsgi import WsgiToAsgi
from pyramid.config import Configurator
from .exceptions import (
BaseError,
exception_message,
)
from .read_conf import get_module_object
from .logger import setup_logging
# Antrian pesan dari / untuk websocket
# key: client ID
# value: list of dict (
# 'status': 'receive' / 'send'
# 'data': dict
# )
ws_data = dict()
class ExtendedWsgiToAsgi(WsgiToAsgi):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.protocol_router = {'http': {}, 'websocket': {}}
async def __call__(self, scope, *args, **kwargs):
protocol = scope['type']
path = scope['path']
try:
consumer = self.protocol_router[protocol][path]
except KeyError:
consumer = None
if consumer is not None:
await consumer(scope, *args, **kwargs)
await super().__call__(scope, *args, **kwargs)
if consumer is not None:
await consumer(scope, *args, **kwargs)
try:
await super().__call__(scope, *args, **kwargs)
except ValueError as e:
# The developer may wish to improve handling of this exception.
# See https://github.com/Pylons/pyramid_cookbook/issues/225 and
# https://asgi.readthedocs.io/en/latest/specs/www.html#websocket
pass
except Exception as e:
raise e
def route(self, rule, *args, **kwargs):
try:
protocol = kwargs['protocol']
except KeyError:
raise Exception(
'You must define a protocol type for an ASGI handler')
def _route(func):
self.protocol_router[protocol][rule] = func
return _route
def ws_queue(status, client_id, data):
if client_id not in ws_data:
ws_data[client_id] = []
d = dict(status=status, data=data)
ws_data[client_id].append(d)
def main(argv=sys.argv[1:]):
conf_file = argv[0]
# Hentikan setup_logging jika ingin mendapatkan pesan kesalahan yang belum
# ditangkap
if not sys.argv[2:]:
setup_logging(conf_file)
log = getLogger('main')
conf = ConfigParser()
conf.read(conf_file)
cf = dict(conf.items('main'))
module = get_module_object(cf['module'])
module.init(cf)
# Configure a normal WSGI app then wrap it with WSGI -> ASGI class
with Configurator(settings=cf) as config:
wsgi_app = config.make_wsgi_app()
app = ExtendedWsgiToAsgi(wsgi_app)
@app.route('/ws', protocol='websocket')
async def main_websocket(scope, receive, send):
async def kirim(d: dict):
log.info(f'{ip} {mem_id} Send {d}')
await send(d)
async def kirim_pesan(d: dict):
text = json.dumps(d)
await kirim({'type': 'websocket.send', 'text': text})
async def run_queue():
if client_id and client_id in ws_data:
while ws_data[client_id]:
q = ws_data[client_id][0]
del ws_data[client_id][0]
if q['status'] == 'send':
await kirim_pesan(q['data'])
async def login(d: dict):
try:
d = module.login(d)
client_id = d['client_id']
ws_data[client_id] = []
except BaseError as e:
d = dict(code=e.code, message=e.message)
except Exception:
msg = exception_message()
log.error(msg)
d = dict(code=91, message='Login gagal')
log.info(f'{ip} {mem_id} Encode JSON {d}')
await kirim_pesan(d)
return client_id
first = True
ip = scope['client'][0]
mem_id = id(scope)
client_id = None
while True:
try:
async with asyncio.timeout(2):
message = await receive()
except asyncio.TimeoutError:
await run_queue()
continue
log.info(f'{ip} {mem_id} Receive {message}')
if message['type'] == 'websocket.connect':
await kirim({'type': 'websocket.accept'})
elif message['type'] == 'websocket.receive':
text = message.get('text')
d = json.loads(text)
log.info(f'{ip} {mem_id} Decode JSON {d}')
if first:
first = False
client_id = await login(d)
if not client_id:
break
elif message['type'] == 'websocket.disconnect':
if client_id in ws_data:
del ws_data[client_id]
break
log.info(f"Listen {cf['ip']} port {cf['port']}")
uvicorn.run(app, host=cf['ip'], port=int(cf['port']), log_level='info')
from logging import getLogger
from pyramid_rpc.jsonrpc import jsonrpc_method
from ..server import (
ws_data,
ws_queue,
)
@jsonrpc_method(endpoint='rpc')
def echo(request, p):
log = getLogger('echo')
ip = request.remote_addr
mem_id = id(request)
log.info(f'{ip} {mem_id} RPC Request echo {p}')
if p['client_id'] in ws_data:
# Untuk websocket client seperti EDC
d = dict(action='echo', id=0)
ws_queue('send', p['client_id'], d)
r = dict(code=0, message='OK')
else:
r = dict(code=91, message='offline')
log.info(f'{ip} {mem_id} RPC Response {r}')
return r
def includeme(config):
config.add_jsonrpc_endpoint('rpc', '/rpc')
config.scan('.')
[main]
module = async_web.handlers.default_client
url = ws://localhost:8000/ws
# Data untuk module
key = ABCD
import requests
url = 'http://localhost:8000/rpc'
data = dict(
method='echo',
params=[dict(client_id='1')],
jsonrpc='2.0',
id=0)
print(url, data)
r = requests.post(url, json=data)
if r.status_code == 200:
print(r.json())
else:
print(f'Status code {r.status_code}')
[main]
module = async_web.handlers.default_server
ip = 0.0.0.0
port = 8000
# Data untuk handlers/default_server.py
keys =
ABCD:1
EFGH:2
pyramid.includes =
pyramid_rpc.jsonrpc
async_web.views.echo
[formatter_generic]
format = %(asctime)s %(levelname)s %(name)s %(message)s
[formatter_simple]
format = %(asctime)s %(levelname)s %(message)s
[handler_console]
class = StreamHandler
stream = sys.stdout
formatter = generic
[handler_file]
class = FileHandler
filename = test-server.log
formatter = simple
[logger_root]
handlers = console, file
level = DEBUG
import os
import sys
import subprocess
from setuptools import setup, find_packages
here = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(here, 'README.rst')) as f:
README = f.read()
with open(os.path.join(here, 'CHANGES.txt')) as f:
CHANGES = f.read()
line = CHANGES.splitlines()[0]
version = line.split()[0]
requires = [
'uvicorn',
'asgiref',
'websockets',
'pyramid',
'pyramid_rpc',
]
setup(
name='async_web',
version=version,
description='Daemon Websocket untuk JSON',
long_description=README + '\n\n' + CHANGES,
classifiers=[
'Programming Language :: Python',
'Framework :: Pyramid',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Internet :: WWW/HTTP :: WSGI :: Application',
],
author='Owo Sugiana',
author_email='sugiana@gmail.com',
url='https://git.opensipkd.com/sugiana/async-web',
keywords='websocket json',
packages=find_packages(),
include_package_data=True,
zip_safe=False,
install_requires=requires,
entry_points={
'console_scripts': [
'async_server = async_web.server:main',
'async_client = async_web.client:main',
]
},
)
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!