katgpucbf.recv module
Shared utilities for receiving SPEAD data.
- class katgpucbf.recv.BaseLayout[source]
Bases:
ABCAbstract base class for chunk layouts to derive from.
- chunk_place(user_data: ndarray) LowLevelCallable[source]
Generate low-level code for placing heaps in chunks.
- Parameters:
user_data – Data to pass to the placement callback
- class katgpucbf.recv.Chunk(*args, sink: ChunkRingPair, device: object = None, **kwargs)[source]
Bases:
ChunkCollection of heaps passed to the GPU at one time.
It extends the spead2 base class to store a timestamp (computed from the chunk ID when the chunk is received), and optionally store a vkgdr device array.
When used as a context manager, it will call
recycle()on exit.- sink: ReferenceType
- class katgpucbf.recv.Counters(heaps: Counter, chunks: Counter, samples: Counter, bytes: Counter, missing_heaps: Counter, clipped_samples: Counter | None = None)[source]
Bases:
objectPrometheus counters for received data.
Each module should define a single global instance.
- PER_POL_COUNTERS: ClassVar = ['heaps', 'samples', 'bytes', 'missing_heaps', 'clipped_samples']
Counters that should be instanced by
labels()
- bytes: Counter
- chunks: Counter
- heaps: Counter
- labels(*labelvalues, **labelkwargs) Self[source]
Apply labels to counters listed in PER_POL_COUNTERS.
- missing_heaps: Counter
- samples: Counter
- katgpucbf.recv.EVICTION_MODE = <EvictionMode.LOSSY: 0>
Eviction mode to use when some streams fall behind
- katgpucbf.recv.RECV_SENSOR_TIMEOUT_CHUNKS = 10
Number of chunks before rx sensor status changes
- katgpucbf.recv.RECV_SENSOR_TIMEOUT_MIN = 1.0
Minimum recv sensor status timeout in seconds
- class katgpucbf.recv.StatsCollector(counter_map: ~collections.abc.Mapping[str, tuple[str, str]], labelnames: ~collections.abc.Iterable[str] = (), namespace: str = '', registry: ~prometheus_client.registry.CollectorRegistry = <prometheus_client.registry.CollectorRegistry object>)[source]
Bases:
CollectorCollect statistics from spead2 streams as Prometheus metrics.
- add_label_set(labels: Iterable[str]) tuple[str, ...][source]
Ensure that statistics exist for a given label set.
This is automatically done by
add_stream()andadd_stream_group(). It may be useful if the streams will only be added later but the statistics are required now.It is a no-op to call this function with a label set that already exists.
- Returns:
The input labels as a tuple
- Return type:
labels
- add_stream(stream: ChunkRingStream, labels: Iterable[str] = ()) None[source]
Register a new stream.
If the collector was constructed with a non-empty
labelnames, thenlabelsmust contain the same number of elements to provide the labels for the metrics that this stream will update.Warning
Calling this more than once with the same stream will cause that stream’s statistics to be counted multiple times.
- add_stream_group(stream_group: ChunkStreamRingGroup, labels: Iterable[str] = ()) None[source]
Register all the streams in a stream group.
If the collector was constructed with a non-empty
labelnames, thenlabelsmust contain the same number of elements to provide the labels for the metrics that this stream will update.Warning
Calling this more than once with the same stream group will cause that group’s statistics to be counted multiple times.
- update() None[source]
Update the internal totals from the streams.
This is done automatically by
collect(), but it can also be called explicitly. This may be useful to do just before a stream goes out of scope, to ensure that counter updates since the last scrape are not lost when the stream is garbage collected.
- katgpucbf.recv.add_reader(stream: AnyStream, *, src: str | list[tuple[str, int]], interface: str | None, ibv: bool, comp_vector: int, buffer_size: int) None[source]
Connect a stream to an underlying transport.
See the documentation for
FEnginefor an explanation of the parameters.
- async katgpucbf.recv.iter_chunks(ringbuffer: ChunkRingbuffer, layout: BaseLayout, sensors: SensorSet, time_converter: TimeConverter, pols: Sequence[tuple[str, str]] | None, counters: Counters, stats_collector: StatsCollector) AsyncGenerator[Chunk, None][source]
Iterate over the chunks and update sensors.
It also populates the chunk timestamp.
If counters has a
clipped_samplescounter, the chunks must have an extra field containing saturation counts, which will be used to populate that counter.- Parameters:
ringbuffer – Source of chunks.
layout – Structure of the streams.
sensors – Sensor set containing at least the sensors created by
make_sensors().time_converter – Converter to turn data timestamps into sensor timestamps.
pols – List of polarisation labels, or
Noneif the sensors are not separated by polarisation. If polarisations are separated, this must be the first axis inChunk.dataandChunk.present. Each element is tuple of the Prometheus label and the katcp sensor prefix.counters – Prometheus counters
stats_collector – Statistics collector. This must already be associated with the stream or stream group, but this function will ensure it is updated at the end of the stream.
- katgpucbf.recv.make_sensors(sensor_timeout: float, prefixes: Sequence[str] = ('',)) SensorSet[source]
Create the sensors needed to hold receiver statistics.
- Parameters:
sensor_timeout – Time (in seconds) without updates before sensors for received data go into error and sensors for missing data become nominal.
prefixes – Prefixes to prepend to sensor names (e.g., for separate polarisations), including the trailing dot if non-empty.
- katgpucbf.recv.make_stream(*, layout: BaseLayout, spead_items: list[int], max_active_chunks: int, data_ringbuffer: ChunkRingbuffer, free_ringbuffer: ChunkRingbuffer, affinity: int, stream_stats: list[str], user_data: ndarray, max_heap_extra: int = 0, **kwargs: Any) ChunkRingStream[source]
Create a SPEAD receiver stream.
- Parameters:
layout – Heap size and chunking parameters.
spead_items – List of SPEAD item IDs to be expected in the heap headers.
max_active_chunks – Maximum number of chunks under construction.
data_ringbuffer – Output ringbuffer to which chunks will be sent.
free_ringbuffer – Ringbuffer for holding chunks for recycling once they’ve been used.
affinity – CPU core affinity for the worker thread (negative to not set an affinity).
stream_stats – Stats to hook up to prometheus.
user_data – Data to pass to the chunk placement callback. It must have a record type with a stats_base element, which will be populated with the index of the first custom statistic.
max_heap_extra – Maximum non-payload data written by the place callback
kwargs – Other keyword arguments are passed to
spead2.recv.StreamConfig.
- katgpucbf.recv.make_stream_group(*, layout: BaseLayout, spead_items: list[int], max_active_chunks: int, data_ringbuffer: ChunkRingbuffer, free_ringbuffer: ChunkRingbuffer, affinity: Sequence[int], stream_stats: list[str], user_data: ndarray, max_heap_extra: int = 0, **kwargs: Any) ChunkStreamRingGroup[source]
Create a group of SPEAD receiver streams.
- Parameters:
layout – Heap size and chunking parameters.
spead_items – List of SPEAD item IDs to be expected in the heap headers.
max_active_chunks – Maximum number of chunks under construction.
data_ringbuffer – Output ringbuffer to which chunks will be sent.
free_ringbuffer – Ringbuffer for holding chunks for recycling once they’ve been used.
affinity – CPU core affinities for the worker threads (negative to not set an affinity). The length of this list determines the number of streams to create.
stream_stats – Stats to hook up to prometheus.
user_data – User data to pass to the chunk callback. It must have a field called stats_base, which will be filled in appropriately (modifying the argument). The length must be the same as the length of affinity (i.e., one element per stream).
max_heap_extra – Maximum non-payload data written by the place callback
kwargs – Other keyword arguments are passed to
spead2.recv.StreamConfig.