"""Methods for serialization of Task and Queue."""
import json
import shutil
from functools import wraps
from os import PathLike
from typing import Optional
import attr
from xdg import xdg_data_home
from ..boilerplate.observable import Event, EventType
from ..boilerplate.timeutils import dump_iso_dt, load_iso_dt
from .models import Queue, QueueManager, Task
__all__ = [
"Database",
]
[docs]def check_locking(func):
@wraps(func)
def checked(self: "Database", *args, **kwargs):
if self.locked:
return func(self, *args, **kwargs)
raise RuntimeError("Database is not controlling the lock!")
return checked
[docs]class Database:
"""Serialize into files on disk in tree-like directories."""
def __enter__(self):
"""Obtain lock on disk database representation."""
try:
(self.data_dir / ".lock").touch(exist_ok=False)
except FileExistsError:
raise RuntimeError(
f"Database lock is present, another aside seems to be running!\n"
f"If you are sure, that no other instance is running, you can delete the "
f"`{self.data_dir / '.lock'}` lockfile."
) from None
self.locked = True
def __exit__(self, exc_type, exc_val, exc_tb):
"""Release lock on disk database representation."""
self.locked = False
(self.data_dir / ".lock").unlink()
[docs] def __init__(
self, queue_manager: QueueManager, data_dir: Optional[PathLike] = None
):
"""Set up data directory."""
self.locked = False
self.data_dir = (xdg_data_home() if data_dir is None else data_dir) / "aside"
self.data_dir.mkdir(exist_ok=True)
self.populate_manager_from_disk(queue_manager)
queue_manager.subscribe(self.observe_queue, "queues/[^/]*")
queue_manager.subscribe(self.observe_queue_metadata, "queues/[^/]*/[^/]*")
queue_manager.subscribe(self.observe_task, ".*/tasks/[^/]*")
queue_manager.subscribe(self.observe_task_metadata, ".*/tasks/[^/]*/[^/]*")
[docs] @check_locking
def observe_task(self, event: Event):
"""Observe tasks collection events raised by queue manager.
Regexp string for matching events: ``.*/tasks/[^/]*``
"""
task_uuid = event.split_attr_path[-1]
queue = event.get_nested_object(2)
if event.event_type is EventType.ADD:
self._dump_task(queue, event.get_nested_object())
elif event.event_type is EventType.DISCARD:
self._drop_task(queue, task_uuid)
else:
pass # pragma: no cover
[docs] @check_locking
def observe_queue(self, event: Event):
"""Observe queue collection events raised by queue manager.
Regexp string for matching events: ``queues/[^/]*``
"""
queue_uuid = event.split_attr_path[-1]
if event.event_type is EventType.ADD:
self._dump_queue(event.get_nested_object())
elif event.event_type is EventType.DISCARD:
self._drop_queue(queue_uuid)
else:
pass # pragma: no cover
@check_locking
def _dump_queue(self, queue: Queue):
self._dump_queue_metadata(queue)
queue_dir = self.data_dir / queue.uuid
queue_dir.mkdir(exist_ok=True)
for task in queue.tasks.keys():
self._dump_task(queue, queue.tasks[task])
@check_locking
def _dump_queue_metadata(self, queue: Queue):
metadata_dict = attr.asdict(
queue, filter=lambda x, _: x.name not in queue.__observable_attrs__
)
with (self.data_dir / f"{queue.uuid}.json").open("w") as out_file:
json.dump(metadata_dict, out_file, ensure_ascii=False, indent=4)
@check_locking
def _drop_queue(self, queue_uuid: str):
queue_dir = self.data_dir / queue_uuid
(self.data_dir / f"{queue_uuid}.json").unlink()
shutil.rmtree(queue_dir)
@check_locking
def _dump_task(self, queue: Queue, task: Task):
queue_path = self.data_dir / queue.uuid
queue_path.mkdir(exist_ok=True)
with (queue_path / f"{task.uuid}.json").open("w") as out_file:
attr_asdict = attr.asdict(task)
attr_asdict["deadline"] = dump_iso_dt(attr_asdict["deadline"])
json.dump(attr_asdict, out_file, ensure_ascii=False, indent=4)
@check_locking
def _drop_task(self, queue: Queue, task_uuid: str):
(self.data_dir / queue.uuid / f"{task_uuid}.json").unlink()
[docs] def populate_manager_from_disk(self, queue_manager: QueueManager):
"""Take queue manager and fill it with queues from disk."""
for queue_path in self.data_dir.glob("*.json"):
with (self.data_dir / queue_path).open("r") as readfile:
queue_metadata = json.load(readfile)
queue = Queue(uuid=queue_metadata["uuid"])
queue_manager.queues.add(queue)
for k in queue_metadata:
if not k == "uuid":
setattr(queue, k, queue_metadata[k])
self.populate_queue_from_disk(queue)
[docs] def populate_queue_from_disk(self, queue: Queue) -> None:
"""Read queue by id from disk."""
for task_path in (self.data_dir / queue.uuid).iterdir():
with task_path.open("r") as readfile:
task_metadata = json.load(readfile)
task_metadata["deadline"] = load_iso_dt(
task_metadata["deadline"]
).astimezone()
task = Task(uuid=task_metadata["uuid"])
queue.tasks.add(task)
for k in task_metadata:
if not k == "uuid":
setattr(task, k, task_metadata[k])