katgpucbf.fgpu.send module

Network transmission handling.

class katgpucbf.fgpu.send.Batch(timestamp: ndarray, data: ndarray, saturated: ndarray, *, n_substreams: int, feng_id: int)[source]

Bases: object

Holds all the heaps for a single timestamp.

It does not own its memory - the backing store is in Chunk.

Parameters:
  • timestamp – Zero-dimensional array of dtype >u8 holding the timestamp.

  • data – Payload data for the batch, of shape (channels, spectra_per_heap, N_POLS).

  • saturated – Saturation data for the batch, of shape (N_POLS,)

  • feng_id – Value to put in feng_id SPEAD item

  • n_substreams – Number of substreams into which the channels are divided

class katgpucbf.fgpu.send.Chunk(data: ndarray, saturated: ndarray, *, n_substreams: int, feng_id: int, spectra_samples: int)[source]

Bases: object

An array of batches, spanning multiple timestamps.

Parameters:
  • data – Storage for voltage data, with shape (n_batches, n_channels, n_spectra_per_heap, N_POLS) and a dtype returned by gaussian_dtype().

  • saturated – Storage for saturation counts, with shape (n_batches, N_POLS) and dtype uint32.

  • n_substreams – Number of substreams over which the data will be divided (must divide evenly into the number of channels).

  • feng_id – F-Engine ID to place in the SPEAD heaps

  • spectra_samples – Difference in timestamps between successive batches

cleanup: Callable[[], None] | None

Callback to return the chunk to the appropriate queue

present

Whether each batch has valid data

async send(streams: list[spead2.send.asyncio.AsyncStream], batches: int, time_converter: TimeConverter, sensors: SensorSet, output_name: str) None[source]

Transmit heaps over SPEAD streams.

Batches from 0 to batches - 1 are sent asynchronously. The contents of each batch are distributed over the streams. If the number of streams does not divide into the number of destination endpoints, there will be imbalances, because the partitioning is the same for every batch.

property timestamp: int

Timestamp of the first heap.

Setting this property updates the timestamps stored in all the heaps. This should not be done while a previous call to send() is still in progress.

katgpucbf.fgpu.send.PREAMBLE_SIZE = 72

Number of non-payload bytes per packet (header, 8 items pointers)

katgpucbf.fgpu.send.make_descriptor_heap(*, channels_per_substream: int, spectra_per_heap: int, sample_bits: int) Heap[source]

Create a descriptor heap for output F-Engine data.

katgpucbf.fgpu.send.make_item_group(**feng_raw_kwargs: Unpack[_FengRawKwargs]) ItemGroup[source]

Create an item group (without values).

The feng_raw_kwargs must specify the shape and dtype/format for the feng_raw item.

katgpucbf.fgpu.send.make_streams(*, output_name: str, thread_pool: ThreadPool, endpoints: list[Endpoint], interfaces: list[str], ttl: int, ibv: bool, packet_payload: int, comp_vector: int, buffer_size: int, bandwidth: float, send_rate_factor: float, feng_id: int, n_ants: int, n_data_heaps: int, chunks: Sequence[Chunk]) list[spead2.send.asyncio.AsyncStream][source]

Create asynchronous SPEAD streams for transmission.

Each stream is configured with substreams for all the end-points. They differ only in the network interface used (there is one per interface). Thus, they can be used interchangeably for load-balancing purposes.