tools.py 3.14 KB
from datetime import (
    datetime,
    date,
    time,
    )
from decimal import Decimal
from mmap import (
    mmap,
    ACCESS_READ,
    )


#################
# Compare value #
#################
MIDNIGHT_TIME = time(0, 0, 0)


def split_time(t) -> tuple:
    if isinstance(t, datetime):
        return t.date(), t.time()
    return t, MIDNIGHT_TIME


def is_same(a, b) -> bool:
    if a == b:
        return True
    if not (isinstance(a, date) or isinstance(a, datetime)):
        return False
    date_a, time_a = split_time(a)
    date_b, time_b = split_time(b)
    if date_a != date_b:
        return False
    if isinstance(a, date) or isinstance(b, date):
        return True
    if time_a == time_b:
        return True
    return False


################
# Plain Object #
################
def date_str(v):
    return f'{v.year}-{v.month:02}-{v.day:02}'


def time_str(v):
    return f'{v.hour:02}:{v.minute:02}:{v.second:02}'


def datetime_str(v):
    s_date = date_str(v)
    s_time = time_str(v)
    s_msec = v.microsecond and f'.{v.microsecond}' or ''
    return f'{s_date} {s_time}{s_msec}'


def plain_value(v):
    if isinstance(v, str):
        return v
    if isinstance(v, datetime):
        return datetime_str(v)
    if isinstance(v, date):
        return date_str(v)
    if isinstance(v, Decimal):
        return float(v)
    if isinstance(v, time):
        return time_str(v)
    return v


def plain_values(d):
    r = dict()
    for key in d:
        v = d[key]
        r[str(key)] = plain_value(v)
    return r


def update(source: dict, target: dict):
    target_update = dict()
    log_msg = []
    for field in target:
        if field not in source:
            continue
        source_value = source[field]
        target_value = target[field]
        if not is_same(source_value, target_value):
            target_update[field] = source_value
            log_source_value = plain_value(source_value)
            log_target_value = plain_value(target_value)
            log_msg.append('{f} {t} to be {s}'.format(
                f=field, t=[log_target_value], s=[log_source_value]))
    return target_update, log_msg


class BacaFile:
    def __init__(self, filename, cari=''):
        if cari:
            self.is_found = False
        else:
            self.is_found = True
        is_first = True
        self.file = open(filename, 'rb')
        s = mmap(self.file.fileno(), 0, access=ACCESS_READ)
        while cari:
            pos = s.find(cari.encode('utf-8'))
            if pos == -1:  # Tidak ketemu
                return
            if pos == 0:  # Ketemu di baris ke-1
                self.is_found = True
                break
            if not is_first:
                self.file.seek(pos)
                self.file.readline()
                self.is_found = True
                break
            cari = '\n' + cari  # Cari di baris berikutnya
            self.file.seek(0)
            is_first = False

    def close(self):
        self.file.close()

    def readline(self):
        if self.is_found:
            return self.file.readline()

    def __enter__(self):
        return self.file

    def __exit__(self, type, value, traceback):
        self.close()