agent_job.py 5.16 KB
import sys
import os
import json
from time import sleep
from logging import getLogger
from configparser import ConfigParser
from textwrap import wrap
from sqlalchemy import engine_from_config
from sqlalchemy.schema import CreateSchema
from sqlalchemy.orm import sessionmaker
from zope.sqlalchemy import register
import transaction
from ..models import (
    Agent,
    Antrian,
    Selesai,
    Mail,
    File,
    )
from .tools import (
    make_pid_file,
    clean_log,
    file2dict,
    file_time,
    )
from .logger import setup_logging


registry = dict()


##############################
# Baca Status Agent (Mailer) #
##############################

def update_agent(d):
    db_session = registry['db_session']
    q = db_session.query(Agent).filter_by(id=d['id'])
    row = q.first()
    if not row:
        row = Agent(id=d['id'], jalur=6)
    row.status = d['status']
    row.startup = d['tgl']
    if d['status'] == 0:
        row.lasterr = None
    else:
        row.lasterr = d['jawaban']
    with transaction.manager:
        db_session.add(row)


def update_job(d):
    db_session = registry['db_session']
    q = db_session.query(Selesai).filter_by(id=d['id'])
    row = q.first()
    agent = None
    if row:
        q = db_session.query(Agent).filter_by(id=row.pengirim)
        agent = q.first()
        row.status = d['status']
        row.jawaban = d['jawaban']
        row.tgl_operator = d['tgl']
        if agent:
            agent.lastjob = d['tgl']
    with transaction.manager:
        db_session.add(row)
        if agent:
            db_session.add(agent)


def read_status_file():
    log = getLogger('read_status_file')
    dir_name = registry['conf']['result_dir']
    files = os.listdir(dir_name)
    files.sort()
    for filename in files:
        fullpath = os.path.join(dir_name, filename)
        try:
            d = file2dict(fullpath)
            log.info(f'Read {fullpath}: {d}')
        except Exception:
            pass
        if 'id' not in d:
            prefix = os.path.splitext(filename)[0]
            try:
                d['id'] = int(prefix)
            except ValueError:
                pass
        if 'id' in d:
            d['tgl'] = file_time(fullpath)
            if filename == 'status.json':
                update_agent(d)
            else:
                update_job(d)
        os.remove(fullpath)


##################################
# Kirim Job untuk Agent (Mailer) #
##################################

def write_job_file(s, mail=None):
    conf = registry['conf']
    db_session = registry['db_session']
    log = getLogger()
    job_file = f'{s.id}.json'
    job_file = os.path.join(conf['job_dir'], job_file)
    d = dict(
            id=s.id, penerima=s.penerima, pesan=s.pesan)
    if mail:
        d.update(dict(subject=mail.subject, name=mail.name))
        q = db_session.query(File).filter_by(id=s.id)
        files = []
        for f in q.order_by(File.urutan):
            files.append([f.filename, f.content])
        if files:
            d['files'] = files
    log.info(f'Write {job_file} {clean_log(d)}')
    json_d = json.dumps(d)
    with open(job_file, 'w') as f:
        f.write(json_d)


def create_mail_row(a):
    name = a.penerima.split('@')[0]
    # Subject ambil dari penggalan pesan
    t = wrap(a.pesan, 50)
    subject = t and t[0] or ''
    return Mail(id=a.id, name=name, subject=subject)


def read_queue():
    log = getLogger('read_queue')
    db_session = registry['db_session']
    q = db_session.query(Antrian).filter_by(kirim=True)
    q_mail = db_session.query(Mail)
    while True:
        a = q.order_by(Antrian.id).first()
        if not a:
            return
        new_mail = False
        if a.jalur == 6:  # Mail
            mail = q_mail.filter_by(id=a.id).first()
            if not mail:
                mail = create_mail_row(a)
                new_mail = True
        d = a.to_dict()
        s = Selesai(**d)
        q_agent = db_session.query(Agent).filter_by(jalur=a.jalur)
        if s.pengirim:
            q_agent = q_agent.filter_by(id=s.pengirim)
        else:
            q_agent = q_agent.filter_by(status=0)
        agent = q_agent.first()
        if agent:
            s.pengirim = agent.id
            if agent.status != 0:
                s.status = -2
                log.error(f'Antrian {a.id}: {agent.id} belum aktif')
        else:
            s.status = -1  # Tidak dapat agent aktif
            log.error(f'Antrian {a.id}: tidak ada pengirim yang aktif')
        with transaction.manager:
            q.filter_by(id=a.id).delete()
            db_session.add(s)
            if new_mail:
                db_session.add(mail)
            db_session.flush()
            db_session.expunge_all()
        if s.status > 0:
            write_job_file(s, mail)


def main(argv=sys.argv[1:]):
    conf_file = argv[0]
    conf = ConfigParser()
    conf.read(conf_file)
    registry['conf'] = cf = dict(conf.items('main'))
    if not make_pid_file(cf['pid_file']):
        sys.exit(1)
    setup_logging(conf_file)
    engine = engine_from_config(cf, 'db_')
    factory = sessionmaker(bind=engine)
    registry['db_session'] = db_session = factory()
    register(db_session)
    while True:
        read_status_file()
        read_queue()
        sleep(1)