katgpucbf.recv module
Shared utilities for receiving SPEAD data.
- class katgpucbf.recv.BaseLayout[source]
Bases:
ABCAbstract base class for chunk layouts to derive from.
- class katgpucbf.recv.Chunk(self: spead2._spead2.recv.Chunk, **kwargs)[source]
Bases:
ChunkCollection of heaps passed to the GPU at one time.
It extends the spead2 base class to store a timestamp (computed from the chunk ID when the chunk is received), and optionally store a vkgdr device array.
When used as a context manager, it will call
recycle()on exit.- sink: ReferenceType
- katgpucbf.recv.EVICTION_MODE = <EvictionMode.LOSSY: 0>
Eviction mode to use when some streams fall behind
- katgpucbf.recv.RECV_SENSOR_TIMEOUT_CHUNKS = 10
Number of chunks before rx sensor status changes
- katgpucbf.recv.RECV_SENSOR_TIMEOUT_MIN = 1.0
Minimum recv sensor status timeout in seconds
- class katgpucbf.recv.StatsCollector(counter_map: ~collections.abc.Mapping[str, tuple[str, str]], labelnames: ~collections.abc.Iterable[str] = (), namespace: str = '', registry: ~prometheus_client.registry.CollectorRegistry = <prometheus_client.registry.CollectorRegistry object>)[source]
Bases:
CollectorCollect statistics from spead2 streams as Prometheus metrics.
- add_stream(stream: ChunkRingStream, labels: Iterable[str] = ()) None[source]
Register a new stream.
If the collector was constructed with a non-empty
labelnames, thenlabelsmust contain the same number of elements to provide the labels for the metrics that this stream will update.Warning
Calling this more than once with the same stream will cause that stream’s statistics to be counted multiple times.
- add_stream_group(stream_group: ChunkStreamRingGroup, labels: Iterable[str] = ()) None[source]
Register all the streams in a stream group.
If the collector was constructed with a non-empty
labelnames, thenlabelsmust contain the same number of elements to provide the labels for the metrics that this stream will update.Warning
Calling this more than once with the same stream group will cause that group’s statistics to be counted multiple times.
- update() None[source]
Update the internal totals from the streams.
This is done automatically by
collect(), but it can also be called explicitly. This may be useful to do just before a stream goes out of scope, to ensure that counter updates since the last scrape are not lost when the stream is garbage collected.
- katgpucbf.recv.add_reader(stream: AnyStream, *, src: str | list[tuple[str, int]], interface: str | None, ibv: bool, comp_vector: int, buffer: int) None[source]
Connect a stream to an underlying transport.
See the documentation for
Enginefor an explanation of the parameters.
- katgpucbf.recv.make_stream(*, layout: BaseLayout, spead_items: list[int], max_active_chunks: int, data_ringbuffer: ChunkRingbuffer, free_ringbuffer: ChunkRingbuffer, affinity: int, stream_stats: list[str], user_data: ndarray, max_heap_extra: int = 0, **kwargs: Any) ChunkRingStream[source]
Create a SPEAD receiver stream.
- Parameters:
layout – Heap size and chunking parameters.
spead_items – List of SPEAD item IDs to be expected in the heap headers.
max_active_chunks – Maximum number of chunks under construction.
data_ringbuffer – Output ringbuffer to which chunks will be sent.
free_ringbuffer – Ringbuffer for holding chunks for recycling once they’ve been used.
affinity – CPU core affinity for the worker thread (negative to not set an affinity).
stream_stats – Stats to hook up to prometheus.
user_data – Data to pass to the chunk placement callback
max_heap_extra – Maximum non-payload data written by the place callback
kwargs – Other keyword arguments are passed to
spead2.recv.StreamConfig.
- katgpucbf.recv.make_stream_group(*, layout: BaseLayout, spead_items: list[int], max_active_chunks: int, data_ringbuffer: ChunkRingbuffer, free_ringbuffer: ChunkRingbuffer, affinity: Sequence[int], stream_stats: list[str], user_data: ndarray, max_heap_extra: int = 0, **kwargs: Any) ChunkStreamRingGroup[source]
Create a group of SPEAD receiver streams.
- Parameters:
layout – Heap size and chunking parameters.
spead_items – List of SPEAD item IDs to be expected in the heap headers.
max_active_chunks – Maximum number of chunks under construction.
data_ringbuffer – Output ringbuffer to which chunks will be sent.
free_ringbuffer – Ringbuffer for holding chunks for recycling once they’ve been used.
affinity – CPU core affinities for the worker threads (negative to not set an affinity). The length of this list determines the number of streams to create.
stream_stats – Stats to hook up to prometheus.
user_data – User data to pass to the chunk callback. It must have a field called stats_base, which will be filled in appropriately (modifying the argument).
max_heap_extra – Maximum non-payload data written by the place callback
kwargs – Other keyword arguments are passed to
spead2.recv.StreamConfig.