Source code for aside.models.database

"""Methods for serialization of Task and Queue."""
import json
import shutil
from functools import wraps
from os import PathLike
from typing import Optional

import attr
from xdg import xdg_data_home

from ..boilerplate.observable import Event, EventType
from ..boilerplate.timeutils import dump_iso_dt, load_iso_dt
from .models import Queue, QueueManager, Task

__all__ = [
    "Database",
]


[docs]def check_locking(func): @wraps(func) def checked(self: "Database", *args, **kwargs): if self.locked: return func(self, *args, **kwargs) raise RuntimeError("Database is not controlling the lock!") return checked
[docs]class Database: """Serialize into files on disk in tree-like directories.""" def __enter__(self): """Obtain lock on disk database representation.""" try: (self.data_dir / ".lock").touch(exist_ok=False) except FileExistsError: raise RuntimeError( f"Database lock is present, another aside seems to be running!\n" f"If you are sure, that no other instance is running, you can delete the " f"`{self.data_dir / '.lock'}` lockfile." ) from None self.locked = True def __exit__(self, exc_type, exc_val, exc_tb): """Release lock on disk database representation.""" self.locked = False (self.data_dir / ".lock").unlink()
[docs] def __init__( self, queue_manager: QueueManager, data_dir: Optional[PathLike] = None ): """Set up data directory.""" self.locked = False self.data_dir = (xdg_data_home() if data_dir is None else data_dir) / "aside" self.data_dir.mkdir(exist_ok=True) self.populate_manager_from_disk(queue_manager) queue_manager.subscribe(self.observe_queue, "queues/[^/]*") queue_manager.subscribe(self.observe_queue_metadata, "queues/[^/]*/[^/]*") queue_manager.subscribe(self.observe_task, ".*/tasks/[^/]*") queue_manager.subscribe(self.observe_task_metadata, ".*/tasks/[^/]*/[^/]*")
[docs] @check_locking def observe_task(self, event: Event): """Observe tasks collection events raised by queue manager. Regexp string for matching events: ``.*/tasks/[^/]*`` """ task_uuid = event.split_attr_path[-1] queue = event.get_nested_object(2) if event.event_type is EventType.ADD: self._dump_task(queue, event.get_nested_object()) elif event.event_type is EventType.DISCARD: self._drop_task(queue, task_uuid) else: pass # pragma: no cover
[docs] @check_locking def observe_task_metadata(self, event: Event): """Observe task events raised by queue manager. Regexp string for matching events: ``.*/tasks/[^/]*/[^/]*`` """ queue = event.get_nested_object(2) if event.event_type is EventType.CHANGE: self._dump_task(queue, event.get_nested_object(-1)) else: pass # pragma: no cover
[docs] @check_locking def observe_queue(self, event: Event): """Observe queue collection events raised by queue manager. Regexp string for matching events: ``queues/[^/]*`` """ queue_uuid = event.split_attr_path[-1] if event.event_type is EventType.ADD: self._dump_queue(event.get_nested_object()) elif event.event_type is EventType.DISCARD: self._drop_queue(queue_uuid) else: pass # pragma: no cover
[docs] @check_locking def observe_queue_metadata(self, event: Event): """Observe queue events raised by queue manager. Regexp string for matching events: ``queues/[^/]*/[^/]*`` """ if event.event_type is EventType.CHANGE: self._dump_queue(event.get_nested_object(-1)) else: pass # pragma: no cover
@check_locking def _dump_queue(self, queue: Queue): self._dump_queue_metadata(queue) queue_dir = self.data_dir / queue.uuid queue_dir.mkdir(exist_ok=True) for task in queue.tasks.keys(): self._dump_task(queue, queue.tasks[task]) @check_locking def _dump_queue_metadata(self, queue: Queue): metadata_dict = attr.asdict( queue, filter=lambda x, _: x.name not in queue.__observable_attrs__ ) with (self.data_dir / f"{queue.uuid}.json").open("w") as out_file: json.dump(metadata_dict, out_file, ensure_ascii=False, indent=4) @check_locking def _drop_queue(self, queue_uuid: str): queue_dir = self.data_dir / queue_uuid (self.data_dir / f"{queue_uuid}.json").unlink() shutil.rmtree(queue_dir) @check_locking def _dump_task(self, queue: Queue, task: Task): queue_path = self.data_dir / queue.uuid queue_path.mkdir(exist_ok=True) with (queue_path / f"{task.uuid}.json").open("w") as out_file: attr_asdict = attr.asdict(task) attr_asdict["deadline"] = dump_iso_dt(attr_asdict["deadline"]) json.dump(attr_asdict, out_file, ensure_ascii=False, indent=4) @check_locking def _drop_task(self, queue: Queue, task_uuid: str): (self.data_dir / queue.uuid / f"{task_uuid}.json").unlink()
[docs] def populate_manager_from_disk(self, queue_manager: QueueManager): """Take queue manager and fill it with queues from disk.""" for queue_path in self.data_dir.glob("*.json"): with (self.data_dir / queue_path).open("r") as readfile: queue_metadata = json.load(readfile) queue = Queue(uuid=queue_metadata["uuid"]) queue_manager.queues.add(queue) for k in queue_metadata: if not k == "uuid": setattr(queue, k, queue_metadata[k]) self.populate_queue_from_disk(queue)
[docs] def populate_queue_from_disk(self, queue: Queue) -> None: """Read queue by id from disk.""" for task_path in (self.data_dir / queue.uuid).iterdir(): with task_path.open("r") as readfile: task_metadata = json.load(readfile) task_metadata["deadline"] = load_iso_dt( task_metadata["deadline"] ).astimezone() task = Task(uuid=task_metadata["uuid"]) queue.tasks.add(task) for k in task_metadata: if not k == "uuid": setattr(task, k, task_metadata[k])