agent_job.py
5.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import sys
import os
import json
from time import sleep
from logging import getLogger
from configparser import ConfigParser
from textwrap import wrap
from sqlalchemy import engine_from_config
from sqlalchemy.schema import CreateSchema
from sqlalchemy.orm import sessionmaker
from zope.sqlalchemy import register
import transaction
from ..models import (
Agent,
Antrian,
Selesai,
Mail,
File,
)
from .tools import (
make_pid_file,
clean_log,
file2dict,
file_time,
)
from .logger import setup_logging
registry = dict()
##############################
# Baca Status Agent (Mailer) #
##############################
def update_agent(d):
db_session = registry['db_session']
q = db_session.query(Agent).filter_by(id=d['id'])
row = q.first()
if not row:
row = Agent(id=d['id'], jalur=6)
row.status = d['status']
row.startup = d['tgl']
if d['status'] == 0:
row.lasterr = None
else:
row.lasterr = d['jawaban']
with transaction.manager:
db_session.add(row)
def update_job(d):
db_session = registry['db_session']
q = db_session.query(Selesai).filter_by(id=d['id'])
row = q.first()
agent = None
if row:
q = db_session.query(Agent).filter_by(id=row.pengirim)
agent = q.first()
row.status = d['status']
row.jawaban = d['jawaban']
row.tgl_operator = d['tgl']
if agent:
agent.lastjob = d['tgl']
with transaction.manager:
db_session.add(row)
if agent:
db_session.add(agent)
def read_status_file():
log = getLogger('read_status_file')
dir_name = registry['conf']['result_dir']
files = os.listdir(dir_name)
files.sort()
for filename in files:
fullpath = os.path.join(dir_name, filename)
try:
d = file2dict(fullpath)
log.info(f'Read {fullpath}: {d}')
except Exception:
pass
if 'id' not in d:
prefix = os.path.splitext(filename)[0]
try:
d['id'] = int(prefix)
except ValueError:
pass
if 'id' in d:
d['tgl'] = file_time(fullpath)
if filename == 'status.json':
update_agent(d)
else:
update_job(d)
os.remove(fullpath)
##################################
# Kirim Job untuk Agent (Mailer) #
##################################
def write_job_file(s, mail=None):
conf = registry['conf']
db_session = registry['db_session']
log = getLogger()
job_file = f'{s.id}.json'
job_file = os.path.join(conf['job_dir'], job_file)
d = dict(
id=s.id, penerima=s.penerima, pesan=s.pesan)
if mail:
d.update(dict(subject=mail.subject, name=mail.name))
q = db_session.query(File).filter_by(id=s.id)
files = []
for f in q.order_by(File.urutan):
files.append([f.filename, f.content])
if files:
d['files'] = files
log.info(f'Write {job_file} {clean_log(d)}')
json_d = json.dumps(d)
with open(job_file, 'w') as f:
f.write(json_d)
def create_mail_row(a):
name = a.penerima.split('@')[0]
# Subject ambil dari penggalan pesan
t = wrap(a.pesan, 50)
subject = t and t[0] or ''
return Mail(id=a.id, name=name, subject=subject)
def read_queue():
log = getLogger('read_queue')
db_session = registry['db_session']
q = db_session.query(Antrian).filter_by(kirim=True)
q_mail = db_session.query(Mail)
while True:
a = q.order_by(Antrian.id).first()
if not a:
return
new_mail = False
if a.jalur == 6: # Mail
mail = q_mail.filter_by(id=a.id).first()
if not mail:
mail = create_mail_row(a)
new_mail = True
d = a.to_dict()
s = Selesai(**d)
q_agent = db_session.query(Agent).filter_by(jalur=a.jalur)
if s.pengirim:
q_agent = q_agent.filter_by(id=s.pengirim)
else:
q_agent = q_agent.filter_by(status=0)
agent = q_agent.first()
if agent:
s.pengirim = agent.id
if agent.status != 0:
s.status = -2
log.error(f'Antrian {a.id}: {agent.id} belum aktif')
else:
s.status = -1 # Tidak dapat agent aktif
log.error(f'Antrian {a.id}: tidak ada pengirim yang aktif')
with transaction.manager:
q.filter_by(id=a.id).delete()
db_session.add(s)
if new_mail:
db_session.add(mail)
db_session.flush()
db_session.expunge_all()
if s.status > 0:
write_job_file(s, mail)
def main(argv=sys.argv[1:]):
conf_file = argv[0]
conf = ConfigParser()
conf.read(conf_file)
registry['conf'] = cf = dict(conf.items('main'))
if not make_pid_file(cf['pid_file']):
sys.exit(1)
setup_logging(conf_file)
engine = engine_from_config(cf, 'db_')
factory = sessionmaker(bind=engine)
registry['db_session'] = db_session = factory()
register(db_session)
while True:
read_status_file()
read_queue()
sleep(1)