add user_id to TextPrinters model and update print_text function to use user_id

1 parent 57dbd96c
......@@ -338,6 +338,7 @@ def main(global_config, **settings):
BASE_CLASS.static_view(config, settings=settings)
BASE_CLASS.single_device = settings.get(
"single_device", "false").lower() == "true"
BASE_CLASS.is_pylpr = settings.get("is_pylpr", "false").lower() == "true"
config.scan(".")
# _logging.debug(config)
return config.make_wsgi_app()
......@@ -518,6 +519,7 @@ class BaseApp():
self.base_dir = os.path.split(__file__)[0]
self.reg_nip = 0
self.single_device = "false"
self.is_pylpr = False
def get_route_file(self, filename="routes.csv"):
fullpath = os.path.join(self.base_dir, 'scripts', 'data', filename)
......
"""Initial table creation
Revision ID: 021fd30ba8a8
Revises: 663de8bbd563
Create Date: 2026-05-01 16:13:34.909164
"""
# revision identifiers, used by Alembic.
revision = '021fd30ba8a8'
down_revision = '663de8bbd563'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###
"""tambah user id pada text-printers
Revision ID: 975f107434c6
Revises: 021fd30ba8a8
Create Date: 2026-05-05 14:04:05.260742
"""
# revision identifiers, used by Alembic.
revision = '975f107434c6'
down_revision = '021fd30ba8a8'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('text_printers', sa.Column('user_id', sa.Integer(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('text_printers', 'user_id')
# ### end Alembic commands ###
......@@ -9,3 +9,4 @@ class TextPrinters(Base, NamaModel):
queue = Column(String(16), default='lp')
port = Column(Integer(), default=515)
timeout = Column(Integer(), default=10)
user_id = Column(Integer(), nullable=False)
......@@ -982,6 +982,7 @@ class BaseView(object):
values = dict(c)
row = self.save_request(values)
kwargs["values"] = values
return self.after_add(row=row, **kwargs)
elif "cancel" in self.req.POST or 'batal' in self.req.POST or "close" in self.req.POST:
self.cancel_act()
......
......@@ -15,68 +15,94 @@ SESS_ADD_FAILED = 'Tambah partner gagal'
SESS_EDIT_FAILED = 'Edit partner gagal'
def print_text(user, text):
def print_text(user_id, text=None, filename=None):
from opensipkd.base.models import TextPrinters
printer = TextPrinters.query().filter_by(create_uid=user.id, status=1).first()
printer = TextPrinters.query().filter_by(user_id=user_id, status=1).first()
if not printer:
log.error(
f"User {user.id} does not have an active printer configured.")
f"User {user_id} does not have an active printer configured.")
raise Exception("No active printer configured for the user.")
# Replace with your printer's IP
hostname = printer.kode if printer else "127.0.0.1"
ip = printer.kode if printer else "127.0.0.1"
ips = ip.split(".") # Support multiple IPs separated by commas
ips = [int(ip.strip()) for ip in ips if ip.strip()] # Clean up whitespace and empty entries
hostname = ".".join(map(str, ips)) if ips else "127.0.0.1"
queue = printer.queue if printer else "lp" # Common default queue name
port = printer.port if printer else 515 # Default LPR port
timeout = printer.timeout if printer else 10 # Timeout in seconds
# Set to True if the printer is an Epson model that requires specific control codes
is_epson = printer.is_epson if printer else False
from pylpr import LprClient
with LprClient(
hostname=hostname,
port=port,
timeout=timeout,
queue=queue,
# recv_buffer=args.recv_buffer,
# username=args.username,
# job_name=args.job_name,
# file_name=getattr(args.print_file, 'name', None) if hasattr(
# args, 'print_file') and args.print_file else None,
# label=args.label,
# use_reserved_port=args.use_reserved_port
is_epson = bool(printer.is_epson) if printer else False
from opensipkd.base import BASE_CLASS
from opensipkd.tools import get_random_string
import os
if not BASE_CLASS.is_pylpr:
import platform
current_os = platform.system().lower()
if current_os == "windows":
cmd = 'lpr'
else: # Darwin is macOS
cmd = 'rlpr'
if text and not filename:
filename = os.path.join(BASE_CLASS.temp_files,
get_random_string(16) + ".txt")
with open(filename, "w", encoding="utf-8") as f:
f.write(text)
try:
import subprocess
subprocess.run(
[cmd, f"--printer={queue}@{hostname}:{port}", filename], check=True)
except subprocess.CalledProcessError as e:
log.error(f"Failed to print using {cmd}: {e}")
raise Exception(f"Failed to print the document. {e}, {hostname}, {port}, {queue}") from e
else:
from pylpr import LprClient
with LprClient(
hostname=hostname,
port=port,
timeout=timeout,
queue=queue,
# recv_buffer=args.recv_buffer,
# username=args.username,
# job_name=args.job_name,
# file_name=getattr(args.print_file, 'name', None) if hasattr(
# args, 'print_file') and args.print_file else None,
# label=args.label,
# use_reserved_port=args.use_reserved_port
) as lpr:
# if args.print_queue:
# # Send the 'Print any waiting jobs' command (0x01) to the specified queue
# lpr._send_lpr_command(1, queue.encode('ascii'))
# print(
# f"Sent 'Print any waiting jobs' command to queue '{args.queue}'")
# return
# if args.status:
# lpr._send_lpr_command(
# 3, (args.queue + " " + args.status).encode('ascii'), noreceive=True)
# print(
# f"Sent 'Send queue state (short)' command with attributes '{args.status}' to queue '{args.queue}'")
# # Receive and print the response from the printer
# response = lpr.receive()
# print("Received queue state (short) response:")
# print(response.decode('utf-8', errors='replace'))
# return
# if args.longstatus:
# lpr._send_lpr_command(
# 4, (args.queue + " " + args.longstatus).encode('ascii'), noreceive=True)
# print(
# f"Sent 'Send queue state (long)' command with attributes '{args.longstatus}' to queue '{args.queue}'")
# # Receive and print the response from the printer
# response = lpr.receive()
# print("Received queue state (long) response:")
# print(response.decode('utf-8', errors='replace'))
# return
# if args.remove:
# # Send the 'Remove jobs' command (0x05) to the specified queue
# lpr._send_lpr_command(
# 5, (args.queue + " " + args.remove).encode('ascii'))
# print(
# f"Sent 'Remove jobs' command with attributes '{args.remove}' to queue '{args.queue}'")
# return
# if args.print_queue:
# # Send the 'Print any waiting jobs' command (0x01) to the specified queue
# lpr._send_lpr_command(1, queue.encode('ascii'))
# print(
# f"Sent 'Print any waiting jobs' command to queue '{args.queue}'")
# return
# if args.status:
# lpr._send_lpr_command(
# 3, (args.queue + " " + args.status).encode('ascii'), noreceive=True)
# print(
# f"Sent 'Send queue state (short)' command with attributes '{args.status}' to queue '{args.queue}'")
# # Receive and print the response from the printer
# response = lpr.receive()
# print("Received queue state (short) response:")
# print(response.decode('utf-8', errors='replace'))
# return
# if args.longstatus:
# lpr._send_lpr_command(
# 4, (args.queue + " " + args.longstatus).encode('ascii'), noreceive=True)
# print(
# f"Sent 'Send queue state (long)' command with attributes '{args.longstatus}' to queue '{args.queue}'")
# # Receive and print the response from the printer
# response = lpr.receive()
# print("Received queue state (long) response:")
# print(response.decode('utf-8', errors='replace'))
# return
# if args.remove:
# # Send the 'Remove jobs' command (0x05) to the specified queue
# lpr._send_lpr_command(
# 5, (args.queue + " " + args.remove).encode('ascii'))
# print(
# f"Sent 'Remove jobs' command with attributes '{args.remove}' to queue '{args.queue}'")
# return
if is_epson:
lpr.send(
lpr.EXIT_PACKET_MODE
......@@ -85,9 +111,16 @@ def print_text(user, text):
+ lpr.FF
)
else:
lpr.send(text.encode('utf-8'))
try:
lpr.send(text.encode('utf-8'))
except Exception as e:
log.error(f"Failed to send print job: {e}")
raise Exception(
f"Failed to print the document. {e}, {hostname}, {port}, {queue}") from e
# lpr.send(text.encode('utf-8'))
class AddSchema(NamaSchema):
nama = colander.SchemaNode(
colander.String(),
......@@ -117,7 +150,12 @@ class AddSchema(NamaSchema):
colander.Integer(),
title="Timeout",
default=10)
user_id = colander.SchemaNode(
colander.Integer(),
widget=widget.SelectWidget(),
title="User ID",
default=10)
status = colander.SchemaNode(
colander.Integer(),
widget=widget.CheckboxWidget(true_val="1", false_val="0"),
......@@ -127,13 +165,14 @@ class AddSchema(NamaSchema):
schema["kode"].title = "IP Address"
schema["kode"].widget = widget.TextInputWidget(
mask="999.999.999.999", # Use specialized mask format
# If required
mask_mapping={'Z': {'pattern': '[0-9]', 'optional': True}},
# css_class='ip_address'
)
# schema["kode"].validator = colander.Regex(
# r'^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$'
# )
from opensipkd.base.models import User
schema["user_id"].widget.values = User.get_list() # Use a simple text input for user_id
request = kw.get('request')
if request and not request.has_permission('admin'):
schema["user_id"].widget = widget.HiddenWidget()
schema["user_id"].default = request.user.id
class EditSchema(AddSchema):
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!