server.py 7.15 KB
import sys
import asyncio
import json
from time import time
from logging import getLogger
from configparser import ConfigParser
import uvicorn
import websockets
from asgiref.wsgi import WsgiToAsgi
from pyramid.config import Configurator
from .exceptions import (
    BaseError,
    exception_message,
    )
from .read_conf import get_module_object
from .logger import setup_logging


# Antrian pesan dari / untuk websocket
# key: client ID
# value: list of dict (
#           'status': 'receive' / 'send'
#           'data': dict
#            )
ws_data = dict()


class ExtendedWsgiToAsgi(WsgiToAsgi):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.protocol_router = {'http': {}, 'websocket': {}}

    async def __call__(self, scope, *args, **kwargs):
        protocol = scope['type']
        path = scope['path']
        try:
            consumer = self.protocol_router[protocol][path]
        except KeyError:
            consumer = None
        if consumer is not None:
            await consumer(scope, *args, **kwargs)
        await super().__call__(scope, *args, **kwargs)
        if consumer is not None:
            await consumer(scope, *args, **kwargs)
        try:
            await super().__call__(scope, *args, **kwargs)
        except ValueError as e:
            # The developer may wish to improve handling of this exception.
            # See https://github.com/Pylons/pyramid_cookbook/issues/225 and
            # https://asgi.readthedocs.io/en/latest/specs/www.html#websocket
            pass
        except Exception as e:
            raise e

    def route(self, rule, *args, **kwargs):
        try:
            protocol = kwargs['protocol']
        except KeyError:
            raise Exception(
                    'You must define a protocol type for an ASGI handler')

        def _route(func):
            self.protocol_router[protocol][rule] = func

        return _route


def ws_queue(status, client_id, data):
    if client_id not in ws_data:
        ws_data[client_id] = []
    d = dict(status=status, data=data)
    ws_data[client_id].append(d)


def main(argv=sys.argv[1:]):
    conf_file = argv[0]
    # Hentikan setup_logging jika ingin mendapatkan pesan kesalahan yang belum
    # ditangkap
    if not sys.argv[2:]:
        setup_logging(conf_file)
    log = getLogger('main')
    conf = ConfigParser()
    conf.read(conf_file)
    cf = dict(conf.items('main'))
    login_timeout = int(cf['login_timeout'])
    module = get_module_object(cf['module'])
    module.init(cf)
    # Memory ID didaftar ini akan berakhir
    old_clients = []
    # Key client_id, Value mem_id
    mem_clients = dict()
    # Configure a normal WSGI app then wrap it with WSGI -> ASGI class
    with Configurator(settings=cf) as config:
        wsgi_app = config.make_wsgi_app()

    app = ExtendedWsgiToAsgi(wsgi_app)

    @app.route('/ws', protocol='websocket')
    async def main_websocket(scope, receive, send):
        def log_info(msg, func=log.info):
            if client_id:
                msg = f'{ip} {mem_id} Client {client_id} {msg}'
            else:
                msg = f'{ip} {mem_id} {msg}'
            func(msg)

        def log_unknown_error():
            log_info(exception_message(), log.error)

        def save_message(method: str, d: dict):
            try:
                handler.save_message(method, d)
            except Exception:
                log_unknown_error()

        async def kirim(d: dict):
            log_info(f'Send {d}')
            await send(d)

        async def kirim_pesan(d: dict):
            text = json.dumps(d)
            save_message('send', d)
            d = {'type': 'websocket.send', 'text': text}
            await kirim(d)

        async def run_queue():
            if client_id and client_id in ws_data:
                while ws_data[client_id]:
                    q = ws_data[client_id][0]
                    del ws_data[client_id][0]
                    if q['status'] == 'send':
                        await kirim_pesan(q['data'])
                try:
                    d = handler.get_data()
                    if d:
                        await kirim_pesan(d)
                except Exception:
                    log_unknown_error()

        async def login(dc: dict):
            cid = None
            try:
                dc = handler.login(dc)
                cid = dc['client_id']
                if cid in ws_data:
                    old_mem_id = mem_clients[cid]
                    old_clients.append(old_mem_id)
                    log_info(
                        f'Koneksi Client {cid} sebelumnya akan diputus',
                        log.warning)
                else:
                    ws_data[cid] = []
                    mem_clients[cid] = mem_id
            except BaseError as e:
                dc = dict(code=e.code, message=e.message, action=e.action)
            except Exception:
                log_unknown_error()
                dc = dict(code=91, message='Login gagal', action='login')
            log_info(f'Encode JSON {dc}')
            await kirim_pesan(dc)
            return cid

        first = True
        ip = scope['client'][0]
        mem_id = id(scope)
        client_id = None
        handler = module.Handler(ip)
        start_time = time()
        while True:
            if not client_id and time() - start_time > login_timeout:
                log_info('Login timeout', log.error)
                break
            if mem_id in old_clients:
                log_info('Koneksi berakhir karena sudah ada yang baru')
                i = old_clients.index(mem_id)
                del old_clients[i]
                break
            try:
                async with asyncio.timeout(2):
                    message = await receive()
            except asyncio.TimeoutError:
                await run_queue()
                continue
            log_info(f'Receive {message}')
            if message['type'] == 'websocket.connect':
                await kirim({'type': 'websocket.accept'})
            elif message['type'] == 'websocket.receive':
                text = message.get('text')
                d = json.loads(text)
                log_info(f'Decode JSON {d}')
                save_message('receive', d)
                if first:
                    first = False
                    client_id = await login(d)
                    if not client_id:
                        break
                else:
                    try:
                        await handler.parse(d)
                    except BaseError as e:
                        log_info(e.message, log.error)
                    except Exception as e:
                        log_unknown_error()
            elif message['type'] == 'websocket.disconnect':
                if client_id in ws_data:
                    del ws_data[client_id]
                break
        try:
            async with asyncio.timeout(5):
                await receive()
        except asyncio.TimeoutError:
            pass
        log_info('Koneksi berakhir')
        try:
            handler.close()
        except Exception:
            log_unknown_error()

    log.info(f"Listen {cf['ip']} port {cf['port']}")
    uvicorn.run(app, host=cf['ip'], port=int(cf['port']), log_level='info')