katgpucbf.xbgpu.bsend module

Module for sending tied array channelised voltage products onto the network.

class katgpucbf.xbgpu.bsend.BSend(outputs: Sequence[BOutput], batches_per_chunk: int, n_chunks: int, n_channels: int, n_channels_per_substream: int, spectra_per_heap: int, adc_sample_rate: float, timestamp_step: int, send_rate_factor: float, channel_offset: int, context: AbstractContext, stream_factory: Callable[[StreamConfig, Sequence[ndarray]], spead2.send.asyncio.AsyncStream], packet_payload: int = 8192, send_enabled: bool = False)[source]

Bases: Send

Class for turning tied array channelised voltage products into SPEAD heaps.

This class creates a queue of chunks that can be sent out onto the network. To obtain a chunk, call get_free_chunk() - which will return a Chunk. This object will create a limited number of transmit buffers and keep recycling them, avoiding any memory allocation at runtime.

The transmission of a chunk’s data is abstracted by send_chunk(). This invokes transmission and immediately returns the Chunk back to the queue for reuse.

This object keeps track of each tied-array-channelised-voltage data stream by means of substreams in spead2.send.asyncio.AsyncStream, allowing for individual enabling and disabling of the data product.

To allow this class to be used with multiple transports, the constructor takes a factory function to create the stream.

Parameters:
  • outputs – Sequence of output.BOutput.

  • batches_per_chunk – Number of Batches in each transmitted Chunk.

  • n_chunks – Number of Chunks to create.

  • adc_sample_rate – See XBEngine for further information.

  • n_channels – See XBEngine for further information.

  • n_channels_per_substream – See XBEngine for further information.

  • spectra_per_heap – See XBEngine for further information.

  • channel_offset – See XBEngine for further information.

  • timestamp_step – The timestamp step between successive heaps.

  • send_rate_factor – Factor dictating how fast the send-stream should transmit data.

  • context – Device context to create buffers.

  • stream_factory – Callback function to create the spead2 send stream. It is passed the stream configuration and memory buffers.

  • packet_payload – Size, in bytes, for the output packets (tied array channelised voltage payload only; headers and padding are added to this).

  • send_enabled – Enable/Disable transmission.

descriptor_heap: Heap
enable_beam(beam_id: int, enable: bool = True, timestamp: int = 0) None[source]

Enable/Disable a beam’s data transmission.

BSend operates as a single, large stream with multiple substreams. Each substream (beam) is its own data product and is required to be enabled/disabled independently.

Parameters:
  • beam_id – Index of the beam’s data product.

  • enable – Boolean indicating whether the beam_id should be enabled or disabled.

  • timestamp – Minimum timestamp to transmit when enabled.

async get_free_chunk() Chunk[source]

Obtain a Chunk for transmission.

We await the chunk’s future to be sure we are not overwriting data that is still being transmitted. If sending failed, it is no longer being transmitted, and therefore safe to return the chunk.

Raises:

asyncio.CancelledError – If the chunk’s send future is cancelled.

preamble_size: Final[int] = 72
send_chunk(chunk: Chunk, time_converter: TimeConverter, sensors: SensorSet) None[source]

Send a chunk’s data and put it on the _chunks_queue.

async send_stop_heap() None[source]

Send a Stop Heap over the spead2 transport.

class katgpucbf.xbgpu.bsend.Batch(timestamp: ndarray, data: ndarray, *, channel_offset: int, present_ants: ndarray)[source]

Bases: object

Hold all data for heaps with a single timestamp.

It does not own its memory - the backing store is in Chunk. It keeps a cached spead2.send.HeapReferenceList with the heaps of the enabled beams, along with a tuple of the enabled beams.

Parameters:
  • timestamp – Zero-dimensional array of dtype >u8 holding the timestamp

  • data – Payload data for the batch with shape (n_beams, n_channels_per_substream, spectra_per_heap, COMPLEX).

  • channel_offset – The first frequency channel processed.

  • present_ants – Zero-dimensional array of dtype >u8 holding the number of antennas present in the Batch’s input data.

class katgpucbf.xbgpu.bsend.Chunk(data: ndarray, saturated: ndarray, *, channel_offset: int, timestamp_step: int)[source]

Bases: object

An array of Batches.

Parameters:
  • data – Storage for tied-array-channelised-voltage data, with shape (n_batches, n_beams, n_channels_per_substream, n_spectra_per_heap, COMPLEX) and dtype SEND_DTYPE.

  • saturated – Storage for saturation counts, with shape (n_beams,) and dtype uint32.

  • channel_offset – The first frequency channel processed.

  • timestamp_step – Timestamp step between successive Batches in a chunk.

property present_ants: ndarray

Number of antennas present in the current beam sums.

This is a count for each Batch in the chunk. Setting this property updates the immediate SPEAD items in the heaps. Much like timestamp, this should only be done when future is done.

send(send_stream: BSend, time_converter: TimeConverter, sensors: SensorSet) Future[source]

Transmit a chunk’s heaps over a SPEAD stream.

This method returns immediately and sends the data asynchronously. Before modifying the chunk, first await future.

property timestamp: int

Timestamp of the first heap.

Setting this property updates the timestamps stored in all the heaps. This should only be done when future is done.

katgpucbf.xbgpu.bsend.make_item_group(bf_raw_shape: tuple[int, ...]) ItemGroup[source]

Create an item group (with no values).

katgpucbf.xbgpu.bsend.make_stream(*, output_names: list[str], endpoints: list[Endpoint], interface: str, ttl: int, use_ibv: bool, affinity: int, buffer_size: int, comp_vector: int, stream_config: StreamConfig, buffers: Sequence[ndarray]) spead2.send.asyncio.AsyncStream[source]

Create asynchronous SPEAD stream for transmission.

This is architected to be a single send stream with multiple substreams, each corresponding to a tied-array-channelised-voltage output data product. The endpoints need not be a contiguous list of multicast addresses.