katgpucbf.recv module
Shared utilities for receiving SPEAD data.
- class katgpucbf.recv.BaseLayout[source]
Bases:
ABC
Abstract base class for chunk layouts to derive from.
- class katgpucbf.recv.Chunk(self: spead2._spead2.recv.Chunk, **kwargs)[source]
Bases:
Chunk
Collection of heaps passed to the GPU at one time.
It extends the spead2 base class to store a timestamp (computed from the chunk ID when the chunk is received), and optionally store a vkgdr device array.
When used as a context manager, it will call
recycle()
on exit.- sink: ReferenceType
- katgpucbf.recv.EVICTION_MODE = <EvictionMode.LOSSY: 0>
Eviction mode to use when some streams fall behind
- katgpucbf.recv.RX_SENSOR_TIMEOUT_CHUNKS = 10
Number of chunks before rx sensor status changes
- katgpucbf.recv.RX_SENSOR_TIMEOUT_MIN = 1.0
Minimum rx sensor status timeout in seconds
- class katgpucbf.recv.StatsCollector(counter_map: ~collections.abc.Mapping[str, tuple[str, str]], labelnames: ~collections.abc.Iterable[str] = (), namespace: str = '', registry: ~prometheus_client.registry.CollectorRegistry = <prometheus_client.registry.CollectorRegistry object>)[source]
Bases:
Collector
Collect statistics from spead2 streams as Prometheus metrics.
- add_stream(stream: ChunkRingStream | ChunkStreamGroupMember, labels: Iterable[str] = ()) None [source]
Register a new stream.
If the collector was constructed with a non-empty
labelnames
, thenlabels
must contain the same number of elements to provide the labels for the metrics that this stream will update.Warning
Calling this more than once with the same stream will cause that stream’s statistics to be counted multiple times.
- update() None [source]
Update the internal totals from the streams.
This is done automatically by
collect()
, but it can also be called explicitly. This may be useful to do just before a stream goes out of scope, to ensure that counter updates since the last scrape are not lost when the stream is garbage collected.
- katgpucbf.recv.add_reader(stream: ChunkRingStream | ChunkStreamGroupMember, *, src: str | list[tuple[str, int]], interface: str | None, ibv: bool, comp_vector: int, buffer: int) None [source]
Connect a stream to an underlying transport.
See the documentation for
Engine
for an explanation of the parameters.
- katgpucbf.recv.make_stream(*, layout: BaseLayout, spead_items: list[int], max_active_chunks: int, data_ringbuffer: ChunkRingbuffer, free_ringbuffer: ChunkRingbuffer, affinity: int, stream_stats: list[str], user_data: ndarray, max_heap_extra: int = 0, **kwargs: Any) ChunkRingStream [source]
Create a SPEAD receiver stream.
- Parameters:
layout – Heap size and chunking parameters.
spead_items – List of SPEAD item IDs to be expected in the heap headers.
max_active_chunks – Maximum number of chunks under construction.
data_ringbuffer – Output ringbuffer to which chunks will be sent.
free_ringbuffer – Ringbuffer for holding chunks for recycling once they’ve been used.
affinity – CPU core affinity for the worker thread (negative to not set an affinity).
stream_stats – Stats to hook up to prometheus.
user_data – Data to pass to the chunk placement callback
max_heap_extra – Maximum non-payload data written by the place callback
kwargs – Other keyword arguments are passed to
spead2.recv.StreamConfig
.
- katgpucbf.recv.make_stream_group(*, layout: BaseLayout, spead_items: list[int], max_active_chunks: int, data_ringbuffer: ChunkRingbuffer, free_ringbuffer: ChunkRingbuffer, affinity: Sequence[int], stream_stats: list[str], user_data: ndarray, max_heap_extra: int = 0, **kwargs: Any) ChunkStreamRingGroup [source]
Create a group of SPEAD receiver streams.
- Parameters:
layout – Heap size and chunking parameters.
spead_items – List of SPEAD item IDs to be expected in the heap headers.
max_active_chunks – Maximum number of chunks under construction.
data_ringbuffer – Output ringbuffer to which chunks will be sent.
free_ringbuffer – Ringbuffer for holding chunks for recycling once they’ve been used.
affinity – CPU core affinities for the worker threads (negative to not set an affinity). The length of this list determines the number of streams to create.
stream_stats – Stats to hook up to prometheus.
user_data – User data to pass to the chunk callback. It must have a field called stats_base, which will be filled in appropriately (modifying the argument).
max_heap_extra – Maximum non-payload data written by the place callback
kwargs – Other keyword arguments are passed to
spead2.recv.StreamConfig
.