katgpucbf.recv module

Shared utilities for receiving SPEAD data.

class katgpucbf.recv.BaseLayout[source]

Bases: ABC

Abstract base class for chunk layouts to derive from.

property chunk_bytes: int

Number of bytes per chunk.

abstract property chunk_heaps: int

Number of heaps per chunk.

chunk_place(user_data: ndarray) LowLevelCallable[source]

Generate low-level code for placing heaps in chunks.

Parameters:

user_data – Data to pass to the placement callback

abstract property heap_bytes: int

Number of payload bytes per heap.

class katgpucbf.recv.Chunk(self: spead2._spead2.recv.Chunk, **kwargs)[source]

Bases: Chunk

Collection of heaps passed to the GPU at one time.

It extends the spead2 base class to store a timestamp (computed from the chunk ID when the chunk is received), and optionally store a vkgdr device array.

When used as a context manager, it will call recycle() on exit.

device: object
recycle() None[source]

Return the chunk to the owning stream/group.

sink: ReferenceType
timestamp: int
katgpucbf.recv.EVICTION_MODE = <EvictionMode.LOSSY: 0>

Eviction mode to use when some streams fall behind

katgpucbf.recv.RX_SENSOR_TIMEOUT_CHUNKS = 10

Number of chunks before rx sensor status changes

katgpucbf.recv.RX_SENSOR_TIMEOUT_MIN = 1.0

Minimum rx sensor status timeout in seconds

class katgpucbf.recv.StatsCollector(counter_map: ~collections.abc.Mapping[str, tuple[str, str]], labelnames: ~collections.abc.Iterable[str] = (), namespace: str = '', registry: ~prometheus_client.registry.CollectorRegistry = <prometheus_client.registry.CollectorRegistry object>)[source]

Bases: Collector

Collect statistics from spead2 streams as Prometheus metrics.

add_stream(stream: ChunkRingStream | ChunkStreamGroupMember, labels: Iterable[str] = ()) None[source]

Register a new stream.

If the collector was constructed with a non-empty labelnames, then labels must contain the same number of elements to provide the labels for the metrics that this stream will update.

Warning

Calling this more than once with the same stream will cause that stream’s statistics to be counted multiple times.

collect() Iterable[Metric][source]

Implement Prometheus’ Collector interface.

update() None[source]

Update the internal totals from the streams.

This is done automatically by collect(), but it can also be called explicitly. This may be useful to do just before a stream goes out of scope, to ensure that counter updates since the last scrape are not lost when the stream is garbage collected.

katgpucbf.recv.add_reader(stream: ChunkRingStream | ChunkStreamGroupMember, *, src: str | list[tuple[str, int]], interface: str | None, ibv: bool, comp_vector: int, buffer: int) None[source]

Connect a stream to an underlying transport.

See the documentation for Engine for an explanation of the parameters.

katgpucbf.recv.make_stream(*, layout: BaseLayout, spead_items: list[int], max_active_chunks: int, data_ringbuffer: ChunkRingbuffer, free_ringbuffer: ChunkRingbuffer, affinity: int, stream_stats: list[str], user_data: ndarray, max_heap_extra: int = 0, **kwargs: Any) ChunkRingStream[source]

Create a SPEAD receiver stream.

Parameters:
  • layout – Heap size and chunking parameters.

  • spead_items – List of SPEAD item IDs to be expected in the heap headers.

  • max_active_chunks – Maximum number of chunks under construction.

  • data_ringbuffer – Output ringbuffer to which chunks will be sent.

  • free_ringbuffer – Ringbuffer for holding chunks for recycling once they’ve been used.

  • affinity – CPU core affinity for the worker thread (negative to not set an affinity).

  • stream_stats – Stats to hook up to prometheus.

  • user_data – Data to pass to the chunk placement callback

  • max_heap_extra – Maximum non-payload data written by the place callback

  • kwargs – Other keyword arguments are passed to spead2.recv.StreamConfig.

katgpucbf.recv.make_stream_group(*, layout: BaseLayout, spead_items: list[int], max_active_chunks: int, data_ringbuffer: ChunkRingbuffer, free_ringbuffer: ChunkRingbuffer, affinity: Sequence[int], stream_stats: list[str], user_data: ndarray, max_heap_extra: int = 0, **kwargs: Any) ChunkStreamRingGroup[source]

Create a group of SPEAD receiver streams.

Parameters:
  • layout – Heap size and chunking parameters.

  • spead_items – List of SPEAD item IDs to be expected in the heap headers.

  • max_active_chunks – Maximum number of chunks under construction.

  • data_ringbuffer – Output ringbuffer to which chunks will be sent.

  • free_ringbuffer – Ringbuffer for holding chunks for recycling once they’ve been used.

  • affinity – CPU core affinities for the worker threads (negative to not set an affinity). The length of this list determines the number of streams to create.

  • stream_stats – Stats to hook up to prometheus.

  • user_data – User data to pass to the chunk callback. It must have a field called stats_base, which will be filled in appropriately (modifying the argument).

  • max_heap_extra – Maximum non-payload data written by the place callback

  • kwargs – Other keyword arguments are passed to spead2.recv.StreamConfig.