katgpucbf.monitor module

Monitor classes allowing for rudimentary performance monitoring.

Queues in the form of asyncio.Queue are used for synchronisation between coroutines in katgpucbf.fgpu, but we may like to know a bit more about what’s happening to them as items are pushed and popped. These metrics help us to see what bottlenecks there are, because if the queues get full (or the “free” queues get empty) it will result in dropped packets.

class katgpucbf.monitor.FileMonitor(filename: str)[source]

Bases: Monitor

Write events to a file.

The file contains JSON-formatted records, one per line. Each record contains time and type keys, with additional type-specific information corresponding to the arguments to the notification functions.

close() None[source]

Close the output file cleanly.

event_qsize(name: str, qsize: int, maxsize: int) None[source]

Report the size and capacity of a queue.

The queue name has current size qsize and capacity maxsize. All calls with the same name must report the same maxsize.

event_qsize_delta(name: str, delta: int) None[source]

Report addition/removal of items from a queue.

The queue name has delta new items in it (or removed if delta is negative). This is an alternative to using event_qsize() when there is no easy way to obtain the absolute size of the queue. There must have been a previous call to event_qsize() to specify the initial capacity.

event_state(name: str, state: str) None[source]

Report the current state of a task.

The state other is conventional when no more specific information is available.

class katgpucbf.monitor.Monitor[source]

Bases: ABC

Base class for performance monitors.

Subclasses can create Queue objects which report their size when it changes via the mechanism defined in the derived Monitor class.

Each subclass will need to override the abstract methods to record the performance events.

close() None[source]

Close the Monitor.

In the base class this does nothing, but if derived classes implement something that needs to close cleanly (such as an output file), then this function can be overridden to do that. It is called when you __exit__ from using the Monitor as a context manager.

abstract event_qsize(name: str, qsize: int, maxsize: int) None[source]

Report the size and capacity of a queue.

The queue name has current size qsize and capacity maxsize. All calls with the same name must report the same maxsize.

abstract event_qsize_delta(name: str, delta: int) None[source]

Report addition/removal of items from a queue.

The queue name has delta new items in it (or removed if delta is negative). This is an alternative to using event_qsize() when there is no easy way to obtain the absolute size of the queue. There must have been a previous call to event_qsize() to specify the initial capacity.

abstract event_state(name: str, state: str) None[source]

Report the current state of a task.

The state other is conventional when no more specific information is available.

make_queue(name: str, maxsize: int = 0) Queue[source]

Create a Queue that reports its size via this Monitor.

time() float[source]

Get a timestamp, relative to the creation time of the monitor.

with_state(name: str, state: str, return_state: str = 'other') Generator[None, None, None][source]

Set a state for the duration of a block.

class katgpucbf.monitor.NullMonitor[source]

Bases: Monitor

A do-nothing monitor that presents the required interface.

event_qsize(name: str, qsize: int, maxsize: int) None[source]

Report the size and capacity of a queue.

The queue name has current size qsize and capacity maxsize. All calls with the same name must report the same maxsize.

event_qsize_delta(name: str, delta: int) None[source]

Report addition/removal of items from a queue.

The queue name has delta new items in it (or removed if delta is negative). This is an alternative to using event_qsize() when there is no easy way to obtain the absolute size of the queue. There must have been a previous call to event_qsize() to specify the initial capacity.

event_state(name: str, state: str) None[source]

Report the current state of a task.

The state other is conventional when no more specific information is available.

make_queue(name: str, maxsize: int = 0) Queue[source]

Create a Queue that reports its size via this Monitor.

class katgpucbf.monitor.Queue(monitor: Monitor, name: str, maxsize: int = 0)[source]

Bases: Queue

Extend asyncio.Queue with performance monitoring.

The only functionality added by any of the overridden functions is to call event_qsize() upon put/get events, transmitting an event to the parent Monitor object, alerting it about the change.

async get() Any[source]

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

get_nowait() Any[source]

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

async put(item: object) None[source]

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

put_nowait(item: object) None[source]

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.