katgpucbf.xbgpu.bsend module
Module for sending tied array channelised voltage products onto the network.
- class katgpucbf.xbgpu.bsend.BSend(outputs: Sequence[BOutput], batches_per_chunk: int, n_chunks: int, n_channels: int, n_channels_per_substream: int, spectra_per_heap: int, adc_sample_rate: float, timestamp_step: int, send_rate_factor: float, channel_offset: int, context: AbstractContext, stream_factory: Callable[[StreamConfig, Sequence[ndarray]], spead2.send.asyncio.AsyncStream], packet_payload: int = 8192, send_enabled: bool = False)[source]
Bases:
SendClass for turning tied array channelised voltage products into SPEAD heaps.
This class creates a queue of chunks that can be sent out onto the network. To obtain a chunk, call
get_free_chunk()- which will return aChunk. This object will create a limited number of transmit buffers and keep recycling them, avoiding any memory allocation at runtime.The transmission of a chunk’s data is abstracted by
send_chunk(). This invokes transmission and immediately returns theChunkback to the queue for reuse.This object keeps track of each tied-array-channelised-voltage data stream by means of substreams in
spead2.send.asyncio.AsyncStream, allowing for individual enabling and disabling of the data product.To allow this class to be used with multiple transports, the constructor takes a factory function to create the stream.
- Parameters:
outputs – Sequence of
output.BOutput.batches_per_chunk – Number of
Batches in each transmittedChunk.n_chunks – Number of
Chunks to create.adc_sample_rate – See
XBEnginefor further information.n_channels – See
XBEnginefor further information.n_channels_per_substream – See
XBEnginefor further information.spectra_per_heap – See
XBEnginefor further information.channel_offset – See
XBEnginefor further information.timestamp_step – The timestamp step between successive heaps.
send_rate_factor – Factor dictating how fast the send-stream should transmit data.
context – Device context to create buffers.
stream_factory – Callback function to create the spead2 send stream. It is passed the stream configuration and memory buffers.
packet_payload – Size, in bytes, for the output packets (tied array channelised voltage payload only; headers and padding are added to this).
send_enabled – Enable/Disable transmission.
- descriptor_heap: Heap
- enable_beam(beam_id: int, enable: bool = True, timestamp: int = 0) None[source]
Enable/Disable a beam’s data transmission.
BSendoperates as a single, large stream with multiple substreams. Each substream (beam) is its own data product and is required to be enabled/disabled independently.- Parameters:
beam_id – Index of the beam’s data product.
enable – Boolean indicating whether the beam_id should be enabled or disabled.
timestamp – Minimum timestamp to transmit when enabled.
- async get_free_chunk() Chunk[source]
Obtain a
Chunkfor transmission.We await the chunk’s
futureto be sure we are not overwriting data that is still being transmitted. If sending failed, it is no longer being transmitted, and therefore safe to return the chunk.- Raises:
asyncio.CancelledError – If the chunk’s send future is cancelled.
- class katgpucbf.xbgpu.bsend.Batch(timestamp: ndarray, data: ndarray, *, channel_offset: int, present_ants: ndarray)[source]
Bases:
objectHold all data for heaps with a single timestamp.
It does not own its memory - the backing store is in
Chunk. It keeps a cachedspead2.send.HeapReferenceListwith the heaps of the enabled beams, along with a tuple of the enabled beams.- Parameters:
timestamp – Zero-dimensional array of dtype
>u8holding the timestampdata – Payload data for the batch with shape (n_beams, n_channels_per_substream, spectra_per_heap, COMPLEX).
channel_offset – The first frequency channel processed.
present_ants – Zero-dimensional array of dtype
>u8holding the number of antennas present in the Batch’s input data.
- class katgpucbf.xbgpu.bsend.Chunk(data: ndarray, saturated: ndarray, *, channel_offset: int, timestamp_step: int)[source]
Bases:
objectAn array of
Batches.- Parameters:
data – Storage for tied-array-channelised-voltage data, with shape (n_batches, n_beams, n_channels_per_substream, n_spectra_per_heap, COMPLEX) and dtype
SEND_DTYPE.saturated – Storage for saturation counts, with shape (n_beams,) and dtype uint32.
channel_offset – The first frequency channel processed.
timestamp_step – Timestamp step between successive
Batches in a chunk.
- property present_ants: ndarray
Number of antennas present in the current beam sums.
This is a count for each
Batchin the chunk. Setting this property updates the immediate SPEAD items in the heaps. Much liketimestamp, this should only be done whenfutureis done.
- send(send_stream: BSend, time_converter: TimeConverter, sensors: SensorSet) Future[source]
Transmit a chunk’s heaps over a SPEAD stream.
This method returns immediately and sends the data asynchronously. Before modifying the chunk, first await
future.
- katgpucbf.xbgpu.bsend.make_item_group(bf_raw_shape: tuple[int, ...]) ItemGroup[source]
Create an item group (with no values).
- katgpucbf.xbgpu.bsend.make_stream(*, output_names: list[str], endpoints: list[Endpoint], interface: str, ttl: int, use_ibv: bool, affinity: int, buffer_size: int, comp_vector: int, stream_config: StreamConfig, buffers: Sequence[ndarray]) spead2.send.asyncio.AsyncStream[source]
Create asynchronous SPEAD stream for transmission.
This is architected to be a single send stream with multiple substreams, each corresponding to a tied-array-channelised-voltage output data product. The endpoints need not be a contiguous list of multicast addresses.