server.py 4.94 KB
import sys
import asyncio
import json
from logging import getLogger
from configparser import ConfigParser
import uvicorn
import websockets
from asgiref.wsgi import WsgiToAsgi
from pyramid.config import Configurator
from .exceptions import (
    BaseError,
    exception_message,
    )
from .read_conf import get_module_object
from .logger import setup_logging


# Antrian pesan dari / untuk websocket
# key: client ID
# value: list of dict (
#           'status': 'receive' / 'send'
#           'data': dict
#            )
ws_data = dict()


class ExtendedWsgiToAsgi(WsgiToAsgi):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.protocol_router = {'http': {}, 'websocket': {}}

    async def __call__(self, scope, *args, **kwargs):
        protocol = scope['type']
        path = scope['path']
        try:
            consumer = self.protocol_router[protocol][path]
        except KeyError:
            consumer = None
        if consumer is not None:
            await consumer(scope, *args, **kwargs)
        await super().__call__(scope, *args, **kwargs)
        if consumer is not None:
            await consumer(scope, *args, **kwargs)
        try:
            await super().__call__(scope, *args, **kwargs)
        except ValueError as e:
            # The developer may wish to improve handling of this exception.
            # See https://github.com/Pylons/pyramid_cookbook/issues/225 and
            # https://asgi.readthedocs.io/en/latest/specs/www.html#websocket
            pass
        except Exception as e:
            raise e

    def route(self, rule, *args, **kwargs):
        try:
            protocol = kwargs['protocol']
        except KeyError:
            raise Exception(
                    'You must define a protocol type for an ASGI handler')

        def _route(func):
            self.protocol_router[protocol][rule] = func

        return _route


def ws_queue(status, client_id, data):
    if client_id not in ws_data:
        ws_data[client_id] = []
    d = dict(status=status, data=data)
    ws_data[client_id].append(d)


def main(argv=sys.argv[1:]):
    conf_file = argv[0]
    # Hentikan setup_logging jika ingin mendapatkan pesan kesalahan yang belum
    # ditangkap
    if not sys.argv[2:]:
        setup_logging(conf_file)
    log = getLogger('main')
    conf = ConfigParser()
    conf.read(conf_file)
    cf = dict(conf.items('main'))
    module = get_module_object(cf['module'])
    module.init(cf)
    # Configure a normal WSGI app then wrap it with WSGI -> ASGI class
    with Configurator(settings=cf) as config:
        wsgi_app = config.make_wsgi_app()

    app = ExtendedWsgiToAsgi(wsgi_app)

    @app.route('/ws', protocol='websocket')
    async def main_websocket(scope, receive, send):
        async def kirim(d: dict):
            log.info(f'{ip} {mem_id} Send {d}')
            await send(d)

        async def kirim_pesan(d: dict):
            text = json.dumps(d)
            await kirim({'type': 'websocket.send', 'text': text})

        async def run_queue():
            if client_id and client_id in ws_data:
                while ws_data[client_id]:
                    q = ws_data[client_id][0]
                    del ws_data[client_id][0]
                    if q['status'] == 'send':
                        await kirim_pesan(q['data'])

        async def login(d: dict):
            try:
                d = module.login(d)
                client_id = d['client_id']
                ws_data[client_id] = []
            except BaseError as e:
                d = dict(code=e.code, message=e.message)
            except Exception:
                msg = exception_message()
                log.error(msg)
                d = dict(code=91, message='Login gagal')
            log.info(f'{ip} {mem_id} Encode JSON {d}')
            await kirim_pesan(d)
            return client_id

        first = True
        ip = scope['client'][0]
        mem_id = id(scope)
        client_id = None
        while True:
            try:
                async with asyncio.timeout(2):
                    message = await receive()
            except asyncio.TimeoutError:
                await run_queue()
                continue
            log.info(f'{ip} {mem_id} Receive {message}')
            if message['type'] == 'websocket.connect':
                await kirim({'type': 'websocket.accept'})
            elif message['type'] == 'websocket.receive':
                text = message.get('text')
                d = json.loads(text)
                log.info(f'{ip} {mem_id} Decode JSON {d}')
                if first:
                    first = False
                    client_id = await login(d)
                    if not client_id:
                        break
            elif message['type'] == 'websocket.disconnect':
                if client_id in ws_data:
                    del ws_data[client_id]
                break

    log.info(f"Listen {cf['ip']} port {cf['port']}")
    uvicorn.run(app, host=cf['ip'], port=int(cf['port']), log_level='info')