katgpucbf.vgpu.send module

Send VDIF frames at a steady rate.

class katgpucbf.vgpu.send.RateLimiter(rate: float, burst_rate: float, capacity: int)[source]

Bases: ABC, Generic

Process items at a limited rate.

This is an abstract base class which requires implementation to actually process the items.

There must be a running event loop when this class is instantiated, and it must be the same event loop running when the asynchronous methods are called.

Parameters:
  • rate – Normal pace for acceptance, in units per second, where the units are determined by item_size(). It can also be zero to disable this pacing.

  • burst_rate – Pace at which to catch up if falling behind (for example, because a sleep took too long, or a garbage collection pause). It can also be zero to disable this pacing. Note that if the producer doesn’t produce items fast enough, this is not used to make up for time that the queue is empty.

  • capacity – Maximum number of items in the queue (0 for unlimited).

abstractmethod item_size(item: T) int[source]

Get the number of units in an item.

async send(item: T) None[source]

Add an item to the queue.

Note that this will return as soon as the item is admitted to the queue, rather than when it is processed. The item should thus not be modified after submitting it.

async stop() None[source]

Wait until all queued items have been processed.

class katgpucbf.vgpu.send.VDIFSender(dsts: list[tuple[str, int]], rate: float, burst_rate: float, capacity: int, *, ttl: int, buffer_size: int, interfaces: list[str])[source]

Bases: RateLimiter[list[VDIFFrame]]

Send VDIF frames at a limited rate to a set of multicast addresses.

The units for rate and burst_rate are samples per second.

item_size(item: list[VDIFFrame]) int[source]

Get the number of units in an item.