server.py
6.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import sys
import asyncio
import json
from time import time
from logging import getLogger
from configparser import ConfigParser
import uvicorn
import websockets
from asgiref.wsgi import WsgiToAsgi
from pyramid.config import Configurator
from .exceptions import (
BaseError,
exception_message,
)
from .read_conf import get_module_object
from .logger import setup_logging
# Antrian pesan dari / untuk websocket
# key: client ID
# value: list of dict (
# 'status': 'receive' / 'send'
# 'data': dict
# )
ws_data = dict()
class ExtendedWsgiToAsgi(WsgiToAsgi):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.protocol_router = {'http': {}, 'websocket': {}}
async def __call__(self, scope, *args, **kwargs):
protocol = scope['type']
path = scope['path']
try:
consumer = self.protocol_router[protocol][path]
except KeyError:
consumer = None
if consumer is not None:
await consumer(scope, *args, **kwargs)
await super().__call__(scope, *args, **kwargs)
if consumer is not None:
await consumer(scope, *args, **kwargs)
try:
await super().__call__(scope, *args, **kwargs)
except ValueError as e:
# The developer may wish to improve handling of this exception.
# See https://github.com/Pylons/pyramid_cookbook/issues/225 and
# https://asgi.readthedocs.io/en/latest/specs/www.html#websocket
pass
except Exception as e:
raise e
def route(self, rule, *args, **kwargs):
try:
protocol = kwargs['protocol']
except KeyError:
raise Exception(
'You must define a protocol type for an ASGI handler')
def _route(func):
self.protocol_router[protocol][rule] = func
return _route
def ws_queue(status, client_id, data):
if client_id not in ws_data:
ws_data[client_id] = []
d = dict(status=status, data=data)
ws_data[client_id].append(d)
def main(argv=sys.argv[1:]):
conf_file = argv[0]
# Hentikan setup_logging jika ingin mendapatkan pesan kesalahan yang belum
# ditangkap
if not sys.argv[2:]:
setup_logging(conf_file)
log = getLogger('main')
conf = ConfigParser()
conf.read(conf_file)
cf = dict(conf.items('main'))
login_timeout = int(cf['login_timeout'])
module = get_module_object(cf['module'])
module.init(cf)
# Memory ID didaftar ini akan berakhir
old_clients = []
# Key client_id, Value mem_id
mem_clients = dict()
# Configure a normal WSGI app then wrap it with WSGI -> ASGI class
with Configurator(settings=cf) as config:
wsgi_app = config.make_wsgi_app()
app = ExtendedWsgiToAsgi(wsgi_app)
@app.route('/ws', protocol='websocket')
async def main_websocket(scope, receive, send):
def log_info(msg, func=log.info):
if client_id:
msg = f'{ip} {mem_id} Client {client_id} {msg}'
else:
msg = f'{ip} {mem_id} {msg}'
func(msg)
def log_unknown_error():
log_info(exception_message(), log.error)
def save_message(method: str, d: dict):
try:
handler.save_message(method, d)
except Exception:
log_unknown_error()
async def kirim(d: dict):
log_info(f'Send {d}')
await send(d)
async def kirim_pesan(d: dict):
text = json.dumps(d)
save_message('send', d)
d = {'type': 'websocket.send', 'text': text}
await kirim(d)
async def run_queue():
if client_id and client_id in ws_data:
while ws_data[client_id]:
q = ws_data[client_id][0]
del ws_data[client_id][0]
if q['status'] == 'send':
await kirim_pesan(q['data'])
try:
d = handler.get_data()
if d:
await kirim_pesan(d)
except Exception:
log_unknown_error()
async def login(dc: dict):
cid = None
try:
dc = handler.login(dc)
cid = dc['client_id']
if cid in ws_data:
old_mem_id = mem_clients[cid]
old_clients.append(old_mem_id)
log_info(
f'Koneksi Client {cid} sebelumnya akan diputus',
log.warning)
else:
ws_data[cid] = []
mem_clients[cid] = mem_id
except BaseError as e:
dc = dict(code=e.code, message=e.message)
except Exception:
log_unknown_error()
dc = dict(code=91, message='Login gagal')
log_info(f'Encode JSON {dc}')
await kirim_pesan(dc)
return cid
first = True
ip = scope['client'][0]
mem_id = id(scope)
client_id = None
handler = module.Handler(ip)
start_time = time()
while True:
if not client_id and time() - start_time > login_timeout:
log_info('Login timeout', log.error)
break
if mem_id in old_clients:
log_info('Koneksi berakhir karena sudah ada yang baru')
i = old_clients.index(mem_id)
del old_clients[i]
break
try:
async with asyncio.timeout(2):
message = await receive()
except asyncio.TimeoutError:
await run_queue()
continue
log_info(f'Receive {message}')
if message['type'] == 'websocket.connect':
await kirim({'type': 'websocket.accept'})
elif message['type'] == 'websocket.receive':
text = message.get('text')
d = json.loads(text)
log_info(f'Decode JSON {d}')
save_message('receive', d)
if first:
first = False
client_id = await login(d)
if not client_id:
break
else:
try:
await handler.parse(d)
except BaseError as e:
log_info(e.message, log.error)
except Exception as e:
log_unknown_error()
elif message['type'] == 'websocket.disconnect':
if client_id in ws_data:
del ws_data[client_id]
break
try:
handler.close()
except Exception:
log_unknown_error()
log.info(f"Listen {cf['ip']} port {cf['port']}")
uvicorn.run(app, host=cf['ip'], port=int(cf['port']), log_level='info')