connection.py 5.72 KB
import sys
import socket
import select
from time import (
    time,
    sleep,
    )


def join_ip_port(ip, port):
    return ':'.join([ip, str(port)])


class Connection:
    def __init__(self, conf=None):
        self.conf = conf
        self.connected_time = None 
        self.running = False
        self.created_time = time()

    def process(self, raw):
        return self.job.process(raw)

    def get_streamer(self):
        cls = self.conf['streamer_cls']
        return cls()

    def is_loop(self):
        sleep(1)
        return self.running

    def set_timeout(self):
        try:
            self.request.settimeout(self.get_timeout())
        except OSError:
            pass

    def before_loop(self):
        self.set_timeout()
        self.streamer = self.get_streamer()
        self.running = True
        self.busy = False
        self.create_job()

    def get_receive_size(self):
        return 2048

    def get_timeout(self):
        return self.conf['timeout']

    def run(self):
        self.before_loop()
        while self.is_loop():
            if not self.connected_time:
                continue
            self.busy = True
            self.on_loop()
            self.busy = False

    def on_loop(self):
        raw = self.receive_raw()
        if not raw:
            if self.is_timeout():
                self.close_because_timeout()
            return
        while True:
            raw = self.streamer.get(raw)
            if not raw:
                return
            raw = self.process(raw)
            if raw:
                self.send(raw)
            raw = ''

    def receive_raw(self):
        try:
            ready, _, _ = select.select([self.request], [], [], 5)
            raw = ready and self.request.recv(
                    self.get_receive_size(), socket.MSG_DONTWAIT) or ''
            if isinstance(raw, bytes):
                raw = raw.decode('utf-8')
            if raw:
                self.on_receive_raw(raw)
            return raw
        except socket.error as err:
            self.on_socket_error(err)
        except socket.timeout as err:
            self.on_socket_error(err)

    def is_connected(self):
        return self.connected_time

    def set_connected_time(self):
        self.connected_time = time()

    def on_receive_raw(self, raw):
        self.set_connected_time()
        self.job.on_receive_raw(raw)

    def close_because_timeout(self):
        self.close()

    def is_timeout(self):
        if self.connected_time:
            return time() - self.connected_time > self.get_timeout()
        return True

    def raw_for_send(self, raw):
        return self.streamer.set(raw)

    def send(self, raw):
        raw = self.raw_for_send(raw)
        self.just_send(raw)

    def just_send(self, raw):
        if sys.version_info.major > 2:
            raw = raw.encode('utf-8')
        try:
            self.request.sendall(raw)
        except socket.error as err:
            self.on_socket_error(err)

    def on_socket_error(self, err):
        self.close()

    def stop_loop(self):
        self.running = False
        self.connected_time = None

    def close(self):
        self.stop_loop()
        try:
            self.request.settimeout(1)
            self.request.close()
        except socket.error:
            pass

    def create_job(self):
        module = self.conf['module_obj']
        self.job = module.Job(self)


class ConnectionManager:
    def __init__(self):
        # Kumpulan class Connection 
        self.conns = []

    def count(self, ip_port):
        count = 0
        for this_ip_port, x in self.conns:
            if this_ip_port == ip_port:
                count += 1
        return count

    def get_first(self, ip_port):
        index = -1
        for this_ip_port, conn in self.conns:
            index += 1
            if this_ip_port == ip_port:
                return conn, index
        return None, None

    def get_last(self, ip_port):
        index = -1
        found_conn = found_index = None
        for this_ip_port, conn in self.conns:
            index += 1
            if this_ip_port == ip_port:
                found_conn = conn 
                found_index = index
        return found_conn, found_index

    def add(self, conn):
        ip_port = join_ip_port(conn.conf['ip'], conn.conf['port'])
        old_conn, index = self.get_first(ip_port)
        self.conns.append([ip_port, conn])
        count = self.count(ip_port)
        if old_conn:
            # Hanya boleh satu connection, jadi tutup sebelumnya
            self.close_old_connection(old_conn)

    def close_old_connection(self, old_conn):
        old_conn.close()
        sleep(2)

    def remove(self, index):
        ip_port, conn = self.conns[index]
        del self.conns[index][0]  # ip_port
        del self.conns[index][0]  # connection object
        del self.conns[index]
        sleep(0.1)
        count = self.count(ip_port)

    def remove_if_old(self, index):
        ip_port, conn = self.conns[index]
        if time() - conn.created_time > 5:
            self.remove(index)
            return True

    def __iter__(self):  # for loop
        for ip_port, conn in self.conns:
            yield [ip_port, conn]

    def __contains__(self, ip_port):  # in operator
        conn, index = self.get_first(ip_port)
        if index is None:
            return
        return index > -1

    def __getitem__(self, s):
        if isinstance(s, str):  # key like dictionary
            ip_port = s
            conn, index = self.get_last(ip_port)
            return conn
        return self.conns[s]  # slice like list