server.py
4.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import sys
import asyncio
import json
from logging import getLogger
from configparser import ConfigParser
import uvicorn
import websockets
from asgiref.wsgi import WsgiToAsgi
from pyramid.config import Configurator
from .exceptions import (
BaseError,
exception_message,
)
from .read_conf import get_module_object
from .logger import setup_logging
# Antrian pesan dari / untuk websocket
# key: client ID
# value: list of dict (
# 'status': 'receive' / 'send'
# 'data': dict
# )
ws_data = dict()
class ExtendedWsgiToAsgi(WsgiToAsgi):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.protocol_router = {'http': {}, 'websocket': {}}
async def __call__(self, scope, *args, **kwargs):
protocol = scope['type']
path = scope['path']
try:
consumer = self.protocol_router[protocol][path]
except KeyError:
consumer = None
if consumer is not None:
await consumer(scope, *args, **kwargs)
await super().__call__(scope, *args, **kwargs)
if consumer is not None:
await consumer(scope, *args, **kwargs)
try:
await super().__call__(scope, *args, **kwargs)
except ValueError as e:
# The developer may wish to improve handling of this exception.
# See https://github.com/Pylons/pyramid_cookbook/issues/225 and
# https://asgi.readthedocs.io/en/latest/specs/www.html#websocket
pass
except Exception as e:
raise e
def route(self, rule, *args, **kwargs):
try:
protocol = kwargs['protocol']
except KeyError:
raise Exception(
'You must define a protocol type for an ASGI handler')
def _route(func):
self.protocol_router[protocol][rule] = func
return _route
def ws_queue(status, client_id, data):
if client_id not in ws_data:
ws_data[client_id] = []
d = dict(status=status, data=data)
ws_data[client_id].append(d)
def main(argv=sys.argv[1:]):
conf_file = argv[0]
# Hentikan setup_logging jika ingin mendapatkan pesan kesalahan yang belum
# ditangkap
if not sys.argv[2:]:
setup_logging(conf_file)
log = getLogger('main')
conf = ConfigParser()
conf.read(conf_file)
cf = dict(conf.items('main'))
module = get_module_object(cf['module'])
module.init(cf)
# Configure a normal WSGI app then wrap it with WSGI -> ASGI class
with Configurator(settings=cf) as config:
wsgi_app = config.make_wsgi_app()
app = ExtendedWsgiToAsgi(wsgi_app)
@app.route('/ws', protocol='websocket')
async def main_websocket(scope, receive, send):
async def kirim(d: dict):
log.info(f'{ip} {mem_id} Send {d}')
await send(d)
async def kirim_pesan(d: dict):
text = json.dumps(d)
await kirim({'type': 'websocket.send', 'text': text})
async def run_queue():
if client_id and client_id in ws_data:
while ws_data[client_id]:
q = ws_data[client_id][0]
del ws_data[client_id][0]
if q['status'] == 'send':
await kirim_pesan(q['data'])
async def login(d: dict):
try:
d = module.login(d)
client_id = d['client_id']
ws_data[client_id] = []
except BaseError as e:
d = dict(code=e.code, message=e.message)
except Exception:
msg = exception_message()
log.error(msg)
d = dict(code=91, message='Login gagal')
log.info(f'{ip} {mem_id} Encode JSON {d}')
await kirim_pesan(d)
return client_id
first = True
ip = scope['client'][0]
mem_id = id(scope)
client_id = None
while True:
try:
async with asyncio.timeout(2):
message = await receive()
except asyncio.TimeoutError:
await run_queue()
continue
log.info(f'{ip} {mem_id} Receive {message}')
if message['type'] == 'websocket.connect':
await kirim({'type': 'websocket.accept'})
elif message['type'] == 'websocket.receive':
text = message.get('text')
d = json.loads(text)
log.info(f'{ip} {mem_id} Decode JSON {d}')
if first:
first = False
client_id = await login(d)
if not client_id:
break
elif message['type'] == 'websocket.disconnect':
if client_id in ws_data:
del ws_data[client_id]
break
log.info(f"Listen {cf['ip']} port {cf['port']}")
uvicorn.run(app, host=cf['ip'], port=int(cf['port']), log_level='info')