idg_letter_category.py 15.4 KB
import logging
from datetime import datetime, timedelta

import pytz
from odoo import models, api, fields, _

# from odoo.addons.base.models.ir_sequence import (_create_sequence, _drop_sequences, _alter_sequence, _select_nextval,
#                                                  _update_nogap)
# from odoo.tools import sql
from odoo.exceptions import UserError
from psycopg2 import sql

_logger = logging.getLogger(__name__)


def _create_sequence(cr, seq_name, number_increment, number_next):
    """ Create a PostreSQL sequence. """
    if number_increment == 0:
        raise UserError(_('Step must not be zero.'))
    sql = "CREATE SEQUENCE %s INCREMENT BY %%s START WITH %%s" % seq_name
    cr.execute(sql, (number_increment, number_next))


def _drop_sequences(cr, seq_names):
    """ Drop the PostreSQL sequences if they exist. """
    names = sql.SQL(',').join(map(sql.Identifier, seq_names))
    # RESTRICT is the default; it prevents dropping the sequence if an
    # object depends on it.
    cr.execute(sql.SQL("DROP SEQUENCE IF EXISTS {} RESTRICT").format(names))


def _alter_sequence(cr, seq_name, number_increment=None, number_next=None):
    """ Alter a PostreSQL sequence. """
    if number_increment == 0:
        raise UserError(_("Step must not be zero."))
    cr.execute("SELECT relname FROM pg_class WHERE relkind=%s AND relname=%s", ('S', seq_name))
    if not cr.fetchone():
        # sequence is not created yet, we're inside create() so ignore it, will be set later
        return
    statement = sql.SQL("ALTER SEQUENCE") + sql.Identifier(seq_name)
    params = []
    if number_increment is not None:
        statement += sql.SQL("INCREMENT BY") + sql.Placeholder()
        params.append(number_increment)
    if number_next is not None:
        statement += sql.SQL("RESTART WITH") + sql.Placeholder()
        params.append(number_next)
    cr.execute(statement.join(' '), params)


def _select_nextval(cr, seq_name):
    cr.execute("SELECT nextval(%s)", [seq_name])
    return cr.fetchone()


def _update_nogap(self, number_increment):
    number_next = self.number_next
    self._cr.execute("SELECT number_next FROM %s WHERE id=%%s FOR UPDATE NOWAIT" % self._table, [self.id])
    self._cr.execute("UPDATE %s SET number_next=number_next+%%s WHERE id=%%s " % self._table,
                     (number_increment, self.id))
    self.invalidate_cache(['number_next'], [self.id])
    return number_next


def _predict_nextval(self, seq_id):
    """Predict next value for PostgreSQL sequence without consuming it"""
    # Cannot use currval() as it requires prior call to nextval()
    seqname = 'ir_category_seq_%s' % seq_id
    seqtable = sql.Identifier(seqname)
    query = sql.SQL("""SELECT last_value,
                      (SELECT increment_by
                       FROM pg_sequences
                       WHERE sequencename = %s),
                      is_called
               FROM {}""")
    params = [seqname]
    if self.env.cr._cnx.server_version < 100000:
        query = sql.SQL("SELECT last_value, increment_by, is_called FROM {}")
        params = []
    self.env.cr.execute(query.format(seqtable), params)
    (last_value, increment_by, is_called) = self.env.cr.fetchone()
    if is_called:
        return last_value + increment_by
    # sequence has just been RESTARTed to return last_value next time
    return last_value


def int_to_roman(num):
    val = [
        1000, 900, 500, 400,
        100, 90, 50, 40,
        10, 9, 5, 4,
        1
    ]
    syb = [
        "M", "CM", "D", "CD",
        "C", "XC", "L", "XL",
        "X", "IX", "V", "IV",
        "I"
    ]
    roman_num = ''
    i = 0
    while num > 0:
        for _ in range(num // val[i]):
            roman_num += syb[i]
            num -= val[i]
        i += 1
    return roman_num


class IdgLetterCategory(models.Model):
    _name = 'idg.letter.category'
    _description = 'E-Office Category'
    _order = "name"

    def _get_number_next_actual(self):
        '''Return number from ir_category_seq row when no_gap implementation,
        and number from postgres sequence when standard implementation.'''
        for seq in self:
            if not seq.id:
                seq.number_next_actual = 0
            elif seq.implementation != 'standard':
                seq.number_next_actual = seq.number_next
            else:
                seq_id = "%03d" % seq.id
                seq.number_next_actual = _predict_nextval(self, seq_id)

    def _set_number_next_actual(self):
        for seq in self:
            seq.write({'number_next': seq.number_next_actual or 1})

    @api.model
    def _get_current_sequence(self, sequence_date=None):
        '''Returns the object on which we can find the number_next to consider for the sequence.
        It could be an ir.sequence or an ir.sequence.date_range depending if use_date_range is checked
        or not. This function will also create the ir.sequence.date_range if none exists yet for today
        '''
        if not self.use_date_range:
            return self

        # sequence_date = sequence_date or fields.Date.today()
        # seq_date = self.env['ir.sequence.date_range'].search(
        #     [('sequence_id', '=', self.id), ('date_from', '<=', sequence_date), ('date_to', '>=', sequence_date)], limit=1)
        # if seq_date:
        #     return seq_date[0]
        # #no date_range sequence was found, we create a new one
        # return self._create_date_range_seq(sequence_date)

    name = fields.Char(required=True)
    code = fields.Char(string='Sequence Code')
    implementation = fields.Selection([('standard', 'Standard'), ('no_gap', 'No gap')],
                                      string='Implementation', required=True, default='standard',
                                      help="While assigning a sequence number to a record, the 'no gap' sequence implementation ensures that each previous sequence number has been assigned already. "
                                           "While this sequence implementation will not skip any sequence number upon assignment, there can still be gaps in the sequence if records are deleted. "
                                           "The 'no gap' implementation is slower than the standard one.")
    active = fields.Boolean(default=True)
    prefix = fields.Char(help="Prefix value of the record for the sequence", trim=False)
    suffix = fields.Char(help="Suffix value of the record for the sequence", trim=False)
    number_next = fields.Integer(string='Next Number', required=True, default=1, help="Next number of this sequence")
    number_next_actual = fields.Integer(compute='_get_number_next_actual', inverse='_set_number_next_actual',
                                        string='Actual Next Number',
                                        help="Next number that will be used. This number can be incremented "
                                             "frequently so the displayed value might already be obsolete")
    number_increment = fields.Integer(string='Step', required=True, default=1,
                                      help="The next number of the sequence will be incremented by this number")
    padding = fields.Integer(string='Sequence Size', required=True, default=0,
                             help="Odoo will automatically adds some '0' on the left of the "
                                  "'Next Number' to get the required padding size.")
    company_id = fields.Many2one('res.company', string='Company',
                                 default=lambda s: s.env.company)
    approver_ids = fields.One2many('idg.letter.category.approver', 'category_id', string='Approves')
    use_date_range = fields.Boolean(string='Use subsequences per date_range', default=False)

    @api.model
    def create(self, values):
        """ Create a sequence, in implementation == standard a fast gaps-allowed PostgreSQL sequence is used.
        """
        # print('AAAAAA')
        seq = super(IdgLetterCategory, self).create(values)
        # seq = models.Model.create(self, values)
        if values.get('implementation', 'standard') == 'standard':
            _create_sequence(self._cr, "ir_category_seq_%03d" % seq.id, values.get('number_increment', 1),
                             values.get('number_next', 1))
        return seq

    def unlink(self):
        _drop_sequences(self._cr, ["ir_category_seq_%03d" % x.id for x in self])
        return super(IdgLetterCategory, self).unlink()

    def write(self, values):
        new_implementation = values.get('implementation')
        for seq in self:
            # 4 cases: we test the previous impl. against the new one.
            i = values.get('number_increment', seq.number_increment)
            n = values.get('number_next', seq.number_next)
            if seq.implementation == 'standard':
                if new_implementation in ('standard', None):
                    # Implementation has NOT changed.
                    # Only change sequence if really requested.
                    if values.get('number_next'):
                        _alter_sequence(self._cr, "ir_category_seq_%03d" % seq.id, number_next=n)
                    if seq.number_increment != i:
                        _alter_sequence(self._cr, "ir_category_seq_%03d" % seq.id, number_increment=i)
                        seq.date_range_ids._alter_sequence(number_increment=i)
                else:
                    _drop_sequences(self._cr, ["ir_category_seq_%03d" % seq.id])
                    for sub_seq in seq.date_range_ids:
                        _drop_sequences(self._cr, ["ir_category_seq_%03d_%03d" % (seq.id, sub_seq.id)])
            else:
                if new_implementation in ('no_gap', None):
                    pass
                else:
                    _create_sequence(self._cr, "ir_category_seq_%03d" % seq.id, i, n)
                    for sub_seq in seq.date_range_ids:
                        _create_sequence(self._cr, "ir_category_seq_%03d_%03d" % (seq.id, sub_seq.id), i, n)
        res = super(IdgLetterCategory, self).write(values)
        # DLE P179
        self.flush(values.keys())
        return res

    def _next_do(self):
        if self.implementation == 'standard':
            number_next = _select_nextval(self._cr, 'ir_category_seq_%03d' % self.id)
        else:
            number_next = _update_nogap(self, self.number_increment)
        return self.get_next_char(number_next)

    def _get_prefix_suffix(self, date=None, date_range=None):
        def _interpolate(s, d):
            return (s % d) if s else ''

        def _interpolation_dict():
            now = range_date = effective_date = datetime.now(pytz.timezone(self._context.get('tz') or 'UTC'))
            if date or self._context.get('idg_category_date'):
                effective_date = fields.Datetime.from_string(date or self._context.get('ir_sequence_date'))
            if date_range or self._context.get('idg_category_date_range'):
                range_date = fields.Datetime.from_string(date_range or self._context.get('ir_sequence_date_range'))

            sequences = {
                'year': '%Y', 'month': '%m', 'day': '%d', 'y': '%y', 'doy': '%j', 'woy': '%W',
                'weekday': '%w', 'h24': '%H', 'h12': '%I', 'min': '%M', 'sec': '%S', 'rmonth': '%rm'
            }
            res = {}
            for key, formats in sequences.items():

                res[key] = effective_date.strftime(formats == "%rm" and '%m' or formats)
                if formats == "%rm":
                    res[key] = int_to_roman(int(res[key]))
                res['range_' + key] = range_date.strftime(formats)
                res['current_' + key] = now.strftime(formats)

            return res

        self.ensure_one()
        d = _interpolation_dict()
        try:
            interpolated_prefix = _interpolate(self.prefix, d)
            interpolated_suffix = _interpolate(self.suffix, d)
        except ValueError:
            raise UserError(_('Invalid prefix or suffix for sequence \'%s\'') % self.name)
        return interpolated_prefix, interpolated_suffix

    def get_next_char(self, number_next):
        interpolated_prefix, interpolated_suffix = self._get_prefix_suffix()
        return interpolated_prefix + '%%0%sd' % self.padding % number_next + interpolated_suffix

    def _next(self, sequence_date=None):
        """ Returns the next number in the preferred sequence in all the ones given in self."""
        if not self.use_date_range:
            return self._next_do()
        # date mode
        # dt = sequence_date or self._context.get('ir_sequence_date', fields.Date.today())
        # seq_date = self.env['ir.sequence.date_range'].search([('sequence_id', '=', self.id), ('date_from', '<=', dt), ('date_to', '>=', dt)], limit=1)
        # if not seq_date:
        #     seq_date = self._create_date_range_seq(dt)
        # return seq_date.with_context(ir_sequence_date_range=seq_date.date_from)._next()

    def next_by_id(self, sequence_date=None):
        """ Draw an interpolated string using the specified sequence."""
        self.check_access_rights('read')
        return self._next(sequence_date=sequence_date)

    @api.model
    def next_by_code(self, sequence_code, sequence_date=None):
        """ Draw an interpolated string using a sequence with the requested code.
            If several sequences with the correct code are available to the user
            (multi-company cases), the one from the user's current company will
            be used.
        """
        self.check_access_rights('read')
        company_id = self.env.company.id
        seq_ids = self.search([('code', '=', sequence_code), ('company_id', 'in', [company_id, False])],
                              order='company_id')
        if not seq_ids:
            _logger.debug(
                "No ir.sequence has been found for code '%s'. Please make sure a sequence is set for current company." % sequence_code)
            return False
        seq_id = seq_ids[0]
        return seq_id._next(sequence_date=sequence_date)

    # @api.model
    # def get_id(self, sequence_code_or_id, code_or_id='id'):
    #     """ Draw an interpolated string using the specified sequence.
    #
    #     The sequence to use is specified by the ``sequence_code_or_id``
    #     argument, which can be a code or an id (as controlled by the
    #     ``code_or_id`` argument. This method is deprecated.
    #     """
    #     _logger.warning("ir_sequence.get() and ir_sequence.get_id() are deprecated. "
    #                     "Please use ir_sequence.next_by_code() or ir_sequence.next_by_id().")
    #     if code_or_id == 'id':
    #         return self.browse(sequence_code_or_id).next_by_id()
    #     else:
    #         return self.next_by_code(sequence_code_or_id)
    #
    # @api.model
    # def get(self, code):
    #     """ Draw an interpolated string using the specified sequence.
    #
    #     The sequence to use is specified by its code. This method is
    #     deprecated.
    #     """
    #     return self.get_id(code, 'code')
    #


class IdgLetterCategoryApprover(models.Model):
    _name = 'idg.letter.category.approver'
    _description = 'E-Office Category Approver'
    category_id = fields.Many2one('idg.letter.category')
    user_id = fields.Many2one('res.users')
    required = fields.Boolean(default=True)