katgpucbf.fgpu.send module
Network transmission handling.
- class katgpucbf.fgpu.send.Batch(timestamp: ndarray, data: ndarray, saturated: ndarray, *, n_substreams: int, feng_id: int)[source]
Bases:
objectHolds all the heaps for a single timestamp.
It does not own its memory - the backing store is in
Chunk.- Parameters:
timestamp – Zero-dimensional array of dtype
>u8holding the timestamp.data – Payload data for the batch, of shape (channels, spectra_per_heap, N_POLS).
saturated – Saturation data for the batch, of shape (N_POLS,)
feng_id – Value to put in
feng_idSPEAD itemn_substreams – Number of substreams into which the channels are divided
- class katgpucbf.fgpu.send.Chunk(data: ndarray, saturated: ndarray, *, n_substreams: int, feng_id: int, spectra_samples: int)[source]
Bases:
objectAn array of batches, spanning multiple timestamps.
- Parameters:
data – Storage for voltage data, with shape (n_batches, n_channels, n_spectra_per_heap, N_POLS) and a dtype returned by
gaussian_dtype().saturated – Storage for saturation counts, with shape (n_batches, N_POLS) and dtype uint32.
n_substreams – Number of substreams over which the data will be divided (must divide evenly into the number of channels).
feng_id – F-Engine ID to place in the SPEAD heaps
spectra_samples – Difference in timestamps between successive batches
- present
Whether each batch has valid data
- async send(streams: list[spead2.send.asyncio.AsyncStream], batches: int, time_converter: TimeConverter, sensors: SensorSet, output_name: str) None[source]
Transmit heaps over SPEAD streams.
Batches from 0 to batches - 1 are sent asynchronously. The contents of each batch are distributed over the streams. If the number of streams does not divide into the number of destination endpoints, there will be imbalances, because the partitioning is the same for every batch.
- katgpucbf.fgpu.send.PREAMBLE_SIZE = 72
Number of non-payload bytes per packet (header, 8 items pointers)
- katgpucbf.fgpu.send.make_descriptor_heap(*, channels_per_substream: int, spectra_per_heap: int, sample_bits: int) Heap[source]
Create a descriptor heap for output F-Engine data.
- katgpucbf.fgpu.send.make_item_group(**feng_raw_kwargs: Unpack[_FengRawKwargs]) ItemGroup[source]
Create an item group (without values).
The feng_raw_kwargs must specify the shape and dtype/format for the
feng_rawitem.
- katgpucbf.fgpu.send.make_streams(*, output_name: str, thread_pool: ThreadPool, endpoints: list[Endpoint], interfaces: list[str], ttl: int, ibv: bool, packet_payload: int, comp_vector: int, buffer_size: int, bandwidth: float, send_rate_factor: float, feng_id: int, n_ants: int, n_data_heaps: int, chunks: Sequence[Chunk]) list[spead2.send.asyncio.AsyncStream][source]
Create asynchronous SPEAD streams for transmission.
Each stream is configured with substreams for all the end-points. They differ only in the network interface used (there is one per interface). Thus, they can be used interchangeably for load-balancing purposes.