Source code for katgpucbf.monitor

################################################################################
# Copyright (c) 2020-2021, 2024 National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Monitor classes allowing for rudimentary performance monitoring.

Queues in the form of :class:`asyncio.Queue` are used for synchronisation
between coroutines in :mod:`katgpucbf.fgpu`, but we may like to know a bit more about
what's happening to them as items are pushed and popped. These metrics help us
to see what bottlenecks there are, because if the queues get full (or the "free"
queues get empty) it will result in dropped packets.
"""

import asyncio
import contextlib
import json
import threading
from abc import ABC, abstractmethod
from collections.abc import Generator
from time import monotonic
from typing import Any, Self


[docs] class Monitor(ABC): """Base class for performance monitors. Subclasses can create :class:`Queue` objects which report their size when it changes via the mechanism defined in the derived :class:`Monitor` class. Each subclass will need to override the abstract methods to record the performance events. """ def __init__(self) -> None: self._time_base = monotonic()
[docs] def time(self) -> float: """Get a timestamp, relative to the creation time of the monitor.""" return monotonic() - self._time_base
[docs] @abstractmethod def event_qsize(self, name: str, qsize: int, maxsize: int) -> None: """Report the size and capacity of a queue. The queue `name` has current size `qsize` and capacity `maxsize`. All calls with the same name must report the same `maxsize`. """
[docs] @abstractmethod def event_qsize_delta(self, name: str, delta: int) -> None: """Report addition/removal of items from a queue. The queue `name` has `delta` new items in it (or removed if `delta` is negative). This is an alternative to using :meth:`event_qsize` when there is no easy way to obtain the absolute size of the queue. There must have been a previous call to :meth:`event_qsize` to specify the initial capacity. """
[docs] @abstractmethod def event_state(self, name: str, state: str) -> None: """Report the current state of a task. The state ``other`` is conventional when no more specific information is available. """
[docs] @contextlib.contextmanager def with_state(self, name: str, state: str, return_state: str = "other") -> Generator[None, None, None]: """Set a state for the duration of a block.""" self.event_state(name, state) yield self.event_state(name, return_state)
[docs] def make_queue(self, name: str, maxsize: int = 0) -> asyncio.Queue: """Create a :class:`Queue` that reports its size via this :class:`Monitor`.""" self.event_qsize(name, 0, maxsize) return Queue(self, name, maxsize)
[docs] def close(self) -> None: # noqa: B027 """Close the Monitor. In the base class this does nothing, but if derived classes implement something that needs to close cleanly (such as an output file), then this function can be overridden to do that. It is called when you ``__exit__`` from using the :class:`Monitor` as a context manager. """ pass
def __enter__(self) -> Self: return self def __exit__(self, *args) -> None: self.close()
[docs] class FileMonitor(Monitor): """Write events to a file. The file contains JSON-formatted records, one per line. Each record contains ``time`` and ``type`` keys, with additional type-specific information corresponding to the arguments to the notification functions. """ def __init__(self, filename: str) -> None: super().__init__() self._lock = threading.Lock() self._file = open(filename, "w")
[docs] def close(self) -> None: """Close the output file cleanly.""" super().close() self._file.close()
def _event(self, data) -> None: with self._lock: json.dump(data, self._file) print(file=self._file)
[docs] def event_qsize(self, name: str, qsize: int, maxsize: int) -> None: # noqa: D102 self._event({"time": self.time(), "type": "qsize", "name": name, "qsize": qsize, "maxsize": maxsize})
[docs] def event_qsize_delta(self, name: str, delta: int) -> None: # noqa: D102 self._event({"time": self.time(), "type": "qsize-delta", "name": name, "delta": delta})
[docs] def event_state(self, name: str, state: str) -> None: # noqa: D102 self._event({"time": self.time(), "type": "state", "name": name, "state": state})
[docs] class NullMonitor(Monitor): """A do-nothing monitor that presents the required interface."""
[docs] def event_qsize(self, name: str, qsize: int, maxsize: int) -> None: # noqa: D102 pass
[docs] def event_qsize_delta(self, name: str, delta: int) -> None: # noqa: D102 pass
[docs] def event_state(self, name: str, state: str) -> None: # noqa: D102 pass
[docs] def make_queue(self, name: str, maxsize: int = 0) -> asyncio.Queue: # noqa: D102 return asyncio.Queue(maxsize)
[docs] class Queue(asyncio.Queue): """Extend :class:`asyncio.Queue` with performance monitoring. The only functionality added by any of the overridden functions is to call :meth:`~Monitor.event_qsize` upon put/get events, transmitting an event to the parent :class:`Monitor` object, alerting it about the change. """ def __init__(self, monitor: Monitor, name: str, maxsize: int = 0): super().__init__(maxsize) self.monitor = monitor self.name = name
[docs] def put_nowait(self, item: object) -> None: # noqa: D102 super().put_nowait(item) self.monitor.event_qsize(self.name, self.qsize(), self.maxsize)
[docs] def get_nowait(self) -> Any: # noqa: D102 item = super().get_nowait() self.monitor.event_qsize(self.name, self.qsize(), self.maxsize) return item
[docs] async def put(self, item: object) -> None: # noqa: D102 await super().put(item) self.monitor.event_qsize(self.name, self.qsize(), self.maxsize)
[docs] async def get(self) -> Any: # noqa: D102 item = await super().get() self.monitor.event_qsize(self.name, self.qsize(), self.maxsize) return item