Add helper functions for Alembic migrations and a script to clean old captcha files

- Implemented `has_table`, `table_has_column`, and `fields_update` functions in `alembic\helpers.py` to facilitate checking for table and column existence during migrations.
- Created `cron_remove_captcha.py` script to delete captcha files older than a specified number of days, enhancing file management.
- Added a new CSV file `mob_routes.csv` with headers for code, path, name, and img for future data integration.
1 parent e20d00ed
5.0.1
- Bug Saat get captcha session tidak berfungsi dimana ditambahkan checking melalui
file jpg yang tersimpan dalam folder captcha files
- Penambahan osipkd-remove-captcha yang harus dijalankan melalui cron setiap
end of day
- Pemindahan alembic dari dalam scripts jadi selevel dengan scripts
5.0.0
4.0.1 2025-08-23
Penambahan obj2json pada api_base
......
No preview for this file type
......@@ -630,7 +630,7 @@ BASE_CLASS = BaseApp()
def has_permission_(request, perm_names, context=None):
if not perm_names:
return False
return True
if isinstance(perm_names, str):
perm_names = [perm_names]
for perm_name in perm_names:
......
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
"""Pyramid bootstrap environment. """
import logging
import os
import importlib.machinery
from alembic import context
from pyramid.paster import (
get_appsettings,
setup_logging,
)
from sqlalchemy import engine_from_config
from opensipkd.base.models.meta import Base
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
setup_logging(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
settings = get_appsettings(config.config_file_name)
logging.info(settings)
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
current_dir = os.path.split(__file__)[0]
helper_file = os.path.join(current_dir, 'helpers.py')
loader = importlib.machinery.SourceFileLoader('alembic_helpers', helper_file)
helpers = loader.load_module()
version_table = 'alembic_models'
version_table_schema = 'public'
def run_migrations_offline() -> None:
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
......@@ -38,41 +39,37 @@ def run_migrations_offline() -> None:
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
context.configure(url=settings['sqlalchemy.url'],
version_table=version_table)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
engine = engine_from_config(settings, prefix='sqlalchemy.')
connection = engine.connect()
context.configure(
connection=connection,
target_metadata=target_metadata,
helpers=helpers,
version_table=version_table,
version_table_schema=version_table_schema
)
try:
with context.begin_transaction():
context.run_migrations()
finally:
connection.close()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
run_migrations_online()
\ No newline at end of file
# http://www.derstappen-it.de/tech-blog/sqlalchemie-alembic-check-if-table-has-column
from alembic import op
from sqlalchemy.engine import reflection
import sqlalchemy as sa
def has_table(table, schema=None, insp=None):
if not insp:
engine = op.get_bind()
insp = reflection.Inspector.from_engine(engine)
return insp.has_table(table, schema=schema)
def table_has_column(table, column, schema=None):
engine = op.get_bind()
insp = reflection.Inspector.from_engine(engine)
has_column = False
if has_table(table, schema, insp):
for col in insp.get_columns(table, schema=schema):
if column != col['name']:
continue
has_column = True
return has_column
def fields_update(table, field, typ, schema="public"):
context = op.get_context()
helpers = context.opts['helpers']
if not helpers.table_has_column(table, field, schema):
op.add_column(table,
# http://www.derstappen-it.de/tech-blog/sqlalchemie-alembic-check-if-table-has-column
from alembic import op
from sqlalchemy.engine import reflection
import sqlalchemy as sa
def has_table(table, schema=None, insp=None):
if not insp:
engine = op.get_bind()
insp = reflection.Inspector.from_engine(engine)
return insp.has_table(table, schema=schema)
def table_has_column(table, column, schema=None):
engine = op.get_bind()
insp = reflection.Inspector.from_engine(engine)
has_column = False
if has_table(table, schema, insp):
for col in insp.get_columns(table, schema=schema):
if column != col['name']:
continue
has_column = True
return has_column
def fields_update(table, field, typ, schema="public"):
context = op.get_context()
helpers = context.opts['helpers']
if not helpers.table_has_column(table, field, schema):
op.add_column(table,
sa.Column(field, typ), schema=schema)
\ No newline at end of file
......@@ -5,22 +5,20 @@ Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade() -> None:
def downgrade():
${downgrades if downgrades else "pass"}
Generic single-database configuration.
\ No newline at end of file
"""Pyramid bootstrap environment. """
import logging
import os
import importlib.machinery
from alembic import context
from pyramid.paster import (
get_appsettings,
setup_logging,
)
from sqlalchemy import engine_from_config
from opensipkd.base.models.meta import Base
config = context.config
setup_logging(config.config_file_name)
settings = get_appsettings(config.config_file_name)
logging.info(settings)
target_metadata = Base.metadata
current_dir = os.path.split(__file__)[0]
helper_file = os.path.join(current_dir, 'helpers.py')
loader = importlib.machinery.SourceFileLoader('alembic_helpers', helper_file)
helpers = loader.load_module()
version_table = 'alembic_models'
version_table_schema = 'public'
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
context.configure(url=settings['sqlalchemy.url'],
version_table=version_table)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
engine = engine_from_config(settings, prefix='sqlalchemy.')
connection = engine.connect()
context.configure(
connection=connection,
target_metadata=target_metadata,
helpers=helpers,
version_table=version_table,
version_table_schema=version_table_schema
)
try:
with context.begin_transaction():
context.run_migrations()
finally:
connection.close()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
\ No newline at end of file
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}
import sys
import os
import time
from pyramid.paster import (get_appsettings, setup_logging, )
def usage(argv):
cmd = os.path.basename(argv[0])
print('usage: %s <config_uri>\n'
'(example: "%s development.ini")' % (cmd, cmd))
sys.exit(1)
def delete_file_if_older(file_path, days_old):
"""
Deletes a file if it exists and its creation time is older than the specified number of days.
Args:
file_path (str): The full path to the file.
days_old (int): The number of days after which a file is considered old enough to be deleted.
"""
if os.path.exists(file_path):
creation_time_timestamp = os.path.getctime(file_path)
current_time_timestamp = time.time()
# Calculate the age of the file in seconds
file_age_seconds = current_time_timestamp - creation_time_timestamp
# Convert the desired age in days to seconds
threshold_seconds = days_old * 24 * 60 * 60
if file_age_seconds > threshold_seconds:
try:
os.remove(file_path)
print(f"Deleted: {file_path} (created {days_old} days ago or more)")
except OSError as e:
print(f"Error deleting file {file_path}: {e}")
else:
print(f"File {file_path} exists but is not older than {days_old} days.")
else:
print(f"File not found: {file_path}")
def main(argv=sys.argv):
if len(argv) < 2:
usage(argv)
config_uri = argv[1]
setup_logging(config_uri)
settings = get_appsettings(config_uri)
captcha_files = settings.get('captcha_files', '/tmp/captcha')
all_entries = os.listdir(captcha_files)
# Filter for only files
files_only = [f for f in all_entries if os.path.isfile(
os.path.join(captcha_files, f))]
print(f"Files in '{captcha_files}':")
for file_name in files_only:
delete_file_if_older(os.path.join(captcha_files, file_name), 1)
code,path,name,img
\ No newline at end of file
......@@ -116,7 +116,7 @@ class BaseView(object):
self.form_widget = None
# self.list_schema = colander.Schema()
# self.add_schema = colander.Schema()
self.add_schema = colander.Schema()
# self.edit_schema = colander.Schema()
self.upload_schema = UploadSchema
......@@ -606,7 +606,7 @@ class BaseView(object):
def returned_form(self, form, **kwargs):
table = kwargs.get("table", None)
if self.req.is_xhr:
if self.req.is_xhr and self.req.params.get("html","false")=="false":
data = form.cstruct
if "captcha" in form:
kode_captcha, file_name = img_captcha(self.req)
......@@ -853,8 +853,8 @@ class BaseView(object):
if isinstance(f.typ, colander.Date):
e.cstruct[f.name] = date_from_str(
e.cstruct[f.name])
if f.name == "captcha":
e.cstruct[f.name] = self.get_captcha_url()
# if f.name == "captcha":
# e.cstruct[f.name] = self.get_captcha_url()
value = self.update_value(value, e.cstruct)
form.set_appstruct(value)
kwargs["table"]=table
......
......@@ -27,13 +27,13 @@ class ListSchema(colander.Schema):
class AddSchema(colander.Schema):
user_id = colander.SchemaNode(
colander.Integer(),
widget=widget.SelectWidget(values=User.get_list()),
# widget=widget.SelectWidget(values=User.get_list()),
oid="user_id",
title="User",
)
desa_id = colander.SchemaNode(
colander.Integer(),
widget=widget.SelectWidget(values=ResDesa.get_list()),
# widget=widget.SelectWidget(values=ResDesa.get_list()),
oid="desa_id",
title="Kelurahan/Desa", )
......
import os
from iso8601.iso8601 import ISO8601_REGEX
from deform.widget import string_types
import json
......@@ -13,7 +14,7 @@ from deform.widget import (
Widget, _StrippedString, Select2Widget, _normalize_choices, OptGroup,
DateInputWidget as WidgetDateInputWidget, AutocompleteInputWidget)
from opensipkd.tools.captcha import img_captcha
from opensipkd.tools import get_settings
_logging = logging.getLogger(__name__)
......@@ -372,14 +373,15 @@ class CaptchaWidget(Widget):
request = None
url = ""
def __init__(self, **kw):
super(CaptchaWidget, self).__init__(**kw)
# def __init__(self, **kw):
# super(CaptchaWidget, self).__init__(**kw)
def serialize(self, field, cstruct, **kw):
file_name = ""
if not cstruct:
kode_captcha, file_name = img_captcha(self.request)
self.request.session["captcha"] = kode_captcha
cstruct = cstruct or self.url+file_name
readonly = kw.get("readonly", self.readonly)
template = readonly and self.readonly_template or self.template
......@@ -395,8 +397,18 @@ class CaptchaWidget(Widget):
pstruct = pstruct.strip()
if not pstruct:
return null
if pstruct != self.request.session["captcha"]:
raise Invalid(field.schema, "Captcha tidak sesuai")
settings = get_settings()
captcha_message = "Captcha tidak sesuai"
captcha_session = self.request.session.get("captcha", "")
if captcha_session:
if pstruct != captcha_session:
raise Invalid(field.schema, captcha_message)
else:
captcha_file = os.path.join(settings['captcha_files'], pstruct)
captcha_exists = os.path.exists(captcha_file)
if not captcha_exists:
raise Invalid(field.schema, captcha_message)
return pstruct
......
......@@ -69,3 +69,4 @@ main = 'opensipkd.base:main'
[project.scripts]
osipkd-db-init = 'opensipkd.base.scripts.initializedb:main'
osipkd-remove-captcha = 'opensipkd.base.scripts.cron_remove_captcha:main'
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!