katgpucbf.recv module

Shared utilities for receiving SPEAD data.

class katgpucbf.recv.BaseLayout[source]

Bases: ABC

Abstract base class for chunk layouts to derive from.

abstract property batch_heaps: int

Number of heaps per chunk, on the axes other than time.

abstract property chunk_batches: int

Number of batches per chunk.

property chunk_bytes: int

Number of bytes per chunk.

property chunk_heaps: int

Number of heaps per chunk.

chunk_place(user_data: ndarray) LowLevelCallable[source]

Generate low-level code for placing heaps in chunks.

Parameters:

user_data – Data to pass to the placement callback

abstract property chunk_timestamp_step: int

Expected increase in timestamp from one chunk to the next.

abstract property heap_bytes: int

Number of payload bytes per heap.

abstract property heap_sample_count: int

Number of samples per heap.

The meaning of samples is up to the concrete subclass. It is used only for statistics.

class katgpucbf.recv.Chunk(*args, sink: ChunkRingPair, device: object = None, **kwargs)[source]

Bases: Chunk

Collection of heaps passed to the GPU at one time.

It extends the spead2 base class to store a timestamp (computed from the chunk ID when the chunk is received), and optionally store a vkgdr device array.

When used as a context manager, it will call recycle() on exit.

device: object
recycle() None[source]

Return the chunk to the owning stream/group.

sink: ReferenceType
timestamp: int
class katgpucbf.recv.Counters(heaps: Counter, chunks: Counter, samples: Counter, bytes: Counter, missing_heaps: Counter, clipped_samples: Counter | None = None)[source]

Bases: object

Prometheus counters for received data.

Each module should define a single global instance.

PER_POL_COUNTERS: ClassVar = ['heaps', 'samples', 'bytes', 'missing_heaps', 'clipped_samples']

Counters that should be instanced by labels()

bytes: Counter
chunks: Counter
clipped_samples: Counter | None = None
heaps: Counter
labels(*labelvalues, **labelkwargs) Self[source]

Apply labels to counters listed in PER_POL_COUNTERS.

missing_heaps: Counter
samples: Counter
katgpucbf.recv.EVICTION_MODE = <EvictionMode.LOSSY: 0>

Eviction mode to use when some streams fall behind

katgpucbf.recv.RECV_SENSOR_TIMEOUT_CHUNKS = 10

Number of chunks before rx sensor status changes

katgpucbf.recv.RECV_SENSOR_TIMEOUT_MIN = 1.0

Minimum recv sensor status timeout in seconds

class katgpucbf.recv.StatsCollector(counter_map: ~collections.abc.Mapping[str, tuple[str, str]], labelnames: ~collections.abc.Iterable[str] = (), namespace: str = '', registry: ~prometheus_client.registry.CollectorRegistry = <prometheus_client.registry.CollectorRegistry object>)[source]

Bases: Collector

Collect statistics from spead2 streams as Prometheus metrics.

add_label_set(labels: Iterable[str]) tuple[str, ...][source]

Ensure that statistics exist for a given label set.

This is automatically done by add_stream() and add_stream_group(). It may be useful if the streams will only be added later but the statistics are required now.

It is a no-op to call this function with a label set that already exists.

Returns:

The input labels as a tuple

Return type:

labels

add_stream(stream: ChunkRingStream, labels: Iterable[str] = ()) None[source]

Register a new stream.

If the collector was constructed with a non-empty labelnames, then labels must contain the same number of elements to provide the labels for the metrics that this stream will update.

Warning

Calling this more than once with the same stream will cause that stream’s statistics to be counted multiple times.

add_stream_group(stream_group: ChunkStreamRingGroup, labels: Iterable[str] = ()) None[source]

Register all the streams in a stream group.

If the collector was constructed with a non-empty labelnames, then labels must contain the same number of elements to provide the labels for the metrics that this stream will update.

Warning

Calling this more than once with the same stream group will cause that group’s statistics to be counted multiple times.

collect() Iterable[Metric][source]

Implement Prometheus’ Collector interface.

update() None[source]

Update the internal totals from the streams.

This is done automatically by collect(), but it can also be called explicitly. This may be useful to do just before a stream goes out of scope, to ensure that counter updates since the last scrape are not lost when the stream is garbage collected.

katgpucbf.recv.add_reader(stream: AnyStream, *, src: str | list[tuple[str, int]], interface: str | None, ibv: bool, comp_vector: int, buffer_size: int) None[source]

Connect a stream to an underlying transport.

See the documentation for FEngine for an explanation of the parameters.

async katgpucbf.recv.iter_chunks(ringbuffer: ChunkRingbuffer, layout: BaseLayout, sensors: SensorSet, time_converter: TimeConverter, pols: Sequence[tuple[str, str]] | None, counters: Counters, stats_collector: StatsCollector) AsyncGenerator[Chunk, None][source]

Iterate over the chunks and update sensors.

It also populates the chunk timestamp.

If counters has a clipped_samples counter, the chunks must have an extra field containing saturation counts, which will be used to populate that counter.

Parameters:
  • ringbuffer – Source of chunks.

  • layout – Structure of the streams.

  • sensors – Sensor set containing at least the sensors created by make_sensors().

  • time_converter – Converter to turn data timestamps into sensor timestamps.

  • pols – List of polarisation labels, or None if the sensors are not separated by polarisation. If polarisations are separated, this must be the first axis in Chunk.data and Chunk.present. Each element is tuple of the Prometheus label and the katcp sensor prefix.

  • counters – Prometheus counters

  • stats_collector – Statistics collector. This must already be associated with the stream or stream group, but this function will ensure it is updated at the end of the stream.

katgpucbf.recv.make_sensors(sensor_timeout: float, prefixes: Sequence[str] = ('',)) SensorSet[source]

Create the sensors needed to hold receiver statistics.

Parameters:
  • sensor_timeout – Time (in seconds) without updates before sensors for received data go into error and sensors for missing data become nominal.

  • prefixes – Prefixes to prepend to sensor names (e.g., for separate polarisations), including the trailing dot if non-empty.

katgpucbf.recv.make_stream(*, layout: BaseLayout, spead_items: list[int], max_active_chunks: int, data_ringbuffer: ChunkRingbuffer, free_ringbuffer: ChunkRingbuffer, affinity: int, stream_stats: list[str], user_data: ndarray, max_heap_extra: int = 0, **kwargs: Any) ChunkRingStream[source]

Create a SPEAD receiver stream.

Parameters:
  • layout – Heap size and chunking parameters.

  • spead_items – List of SPEAD item IDs to be expected in the heap headers.

  • max_active_chunks – Maximum number of chunks under construction.

  • data_ringbuffer – Output ringbuffer to which chunks will be sent.

  • free_ringbuffer – Ringbuffer for holding chunks for recycling once they’ve been used.

  • affinity – CPU core affinity for the worker thread (negative to not set an affinity).

  • stream_stats – Stats to hook up to prometheus.

  • user_data – Data to pass to the chunk placement callback. It must have a record type with a stats_base element, which will be populated with the index of the first custom statistic.

  • max_heap_extra – Maximum non-payload data written by the place callback

  • kwargs – Other keyword arguments are passed to spead2.recv.StreamConfig.

katgpucbf.recv.make_stream_group(*, layout: BaseLayout, spead_items: list[int], max_active_chunks: int, data_ringbuffer: ChunkRingbuffer, free_ringbuffer: ChunkRingbuffer, affinity: Sequence[int], stream_stats: list[str], user_data: ndarray, max_heap_extra: int = 0, **kwargs: Any) ChunkStreamRingGroup[source]

Create a group of SPEAD receiver streams.

Parameters:
  • layout – Heap size and chunking parameters.

  • spead_items – List of SPEAD item IDs to be expected in the heap headers.

  • max_active_chunks – Maximum number of chunks under construction.

  • data_ringbuffer – Output ringbuffer to which chunks will be sent.

  • free_ringbuffer – Ringbuffer for holding chunks for recycling once they’ve been used.

  • affinity – CPU core affinities for the worker threads (negative to not set an affinity). The length of this list determines the number of streams to create.

  • stream_stats – Stats to hook up to prometheus.

  • user_data – User data to pass to the chunk callback. It must have a field called stats_base, which will be filled in appropriately (modifying the argument). The length must be the same as the length of affinity (i.e., one element per stream).

  • max_heap_extra – Maximum non-payload data written by the place callback

  • kwargs – Other keyword arguments are passed to spead2.recv.StreamConfig.