katgpucbf.xbgpu.engine module

Define the objects that implement an entire GPU XB-Engine pipeline.

The XBEngine comprises multiple Pipeline objects that facilitate output data products. Additionally, several subclasses of QueueItem are defined for passing data between parts of the engine.

class katgpucbf.xbgpu.engine.BOutQueueItem(out: DeviceArray, saturated: DeviceArray, present: ndarray, weights: MappedArray, delays: MappedArray, timestamp: int = 0)[source]

Bases: QueueItem

Transmit queue item for the beamformer pipeline.

The out, saturated, rand_states, weights and delays must have the same shape and dtype as the corresponding slots in Beamform. This class needs to carry a record of which antennas have missed data in the current Chunk of data. This is used to determine whether any beam data has been affected, and have their data flagged accordingly.

Parameters:
  • out – An int8 type DeviceArray with shape (batches, ants, channels, spectra_per_batch, N_POLS, COMPLEX).

  • saturated – An uint32 type DeviceArray with shape (n_beams,).

  • present – A boolean array with shape (heaps_per_fengine_per_chunk, n_ants).

  • weights – An np.complex64 MappedArray with shape (n_ants, n_beams).

  • delays – An np.float32 MappedArray with shape (n_ants, n_beams).

  • timestamp (int) – ADC sample count of the received Chunk.

  • todo:: (..) – Potentially create (yet another) base class for B- and XOutQueueItems.

reset(timestamp: int = 0) None[source]

Reset the timestamp and the present antenna tracker.

weights_version

Version of weights and delays (for comparison to BPipeline._weights_version)

class katgpucbf.xbgpu.engine.BPipeline(outputs: ~collections.abc.Sequence[~katgpucbf.xbgpu.output.BOutput], engine: ~katgpucbf.xbgpu.engine.XBEngine, context: ~katsdpsigproc.abc.AbstractContext, vkgdr_handle: <MagicMock id='140228911845920'>, init_send_enabled: bool, name: str = 'bpipeline')[source]

Bases: Pipeline[BOutput, BOutQueueItem]

Processing pipeline for a collection of output.BOutput.

capture_enable(*, stream_id: int, enable: bool = True, timestamp: int = 0) None[source]

Enable/Disable the transmission of a data product’s stream.

async gpu_proc_loop() None[source]

Perform all GPU processing of received data in a continuous loop.

This method does the following:

  • Get an InQueueItem off the in_queue

  • Ensure it is not a NoneType value (indicating shutdown sequence)

  • await any outstanding events associated with the InQueueItem

  • Apply GPU processing to data in the InQueueItem

    • Bind input buffer(s) accordingly

  • Obtain a free QueueItem from the out_free_queue

    • Add event marker to wait for the proc_command_queue

    • Put the prepared QueueItem on the out_queue

NOTE: An initial QueueItem needs to be obtained from the out_free_queue for the first round of processing:

  • The gpu_proc_loop requires logic to decipher the timestamp of the first heap output.

  • It also provides an opportunity to bind buffers before processing is queued.

async sender_loop() None[source]

Send heaps to the network in a continuous loop.

This method does the following:

  • Get a QueueItem from the out_queue

  • Ensure it is not a NoneType value (indicating shutdown sequence)

  • Wait for events on the item to complete (likely GPU processing)

  • Wait for an available heap buffer from the send_stream

  • Asynchronously Transfer/Download GPU buffer data into the heap buffer in system RAM

    • Wait for the transfer to complete, before

  • Transmit heap buffer onto the network

  • Place the QueueItem back on the out_free_queue once complete

set_delays(stream_id: int, delays: ndarray) int[source]

Set the beam steering delays for one beam.

Parameters:
  • stream_id – The index of the beam whose quantisation gain is being set.

  • delays – A 2D array of delay coefficients. The first axis corresponds to inputs. The second axis has length two, with the first element containing the delay in seconds, and the second containing a channel-independent phase rotation in radians.

Returns:

_weights_steady, the timestamp that includes the effect of a beam-delay update made now.

Return type:

int

set_quant_gain(stream_id: int, gain: float) int[source]

Set the quantisation gain for one beam.

Parameters:
  • stream_id – The index of the beam whose quantisation gain is being set.

  • gain – Real-valued quantisation gain.

Returns:

_weights_steady, the timestamp that includes the effect of a beam-quant-gain update made now.

Return type:

int

set_weights(stream_id: int, weights: ndarray) int[source]

Set the beam weights for one beam.

Parameters:
  • stream_id – The index of the beam whose weights are being set.

  • weights – A 1D array containing real-valued weights (per input).

Returns:

_weights_steady, the timestamp that includes the effect of a beam-weights update made now.

Return type:

int

class katgpucbf.xbgpu.engine.InQueueItem(buffer_device: DeviceArray, present: ndarray, timestamp: int = 0)[source]

Bases: QueueItem

Extension of the QueueItem for use in receive queues.

The InQueueItem holds a reference to the received Chunk because its data is copied into the GPU buffer asynchronously. Once the GPU processing loop is done with the Chunk’s data, it hands it back to the receiving loop to reuse the resource. It is for this reason that the Chunk’s heap presence is stored separately.

The InQueueItem also holds a reference count of the number of pipelines still using this item. This is to ensure the item isn’t freed before each pipeline has had a chance to use the received data.

reset(timestamp: int = 0) None[source]

Reset the timestamp and chunk.

class katgpucbf.xbgpu.engine.Pipeline(outputs: Sequence, name: str, engine: XBEngine, context: AbstractContext)[source]

Bases: Generic

Base Pipeline class to build on.

SPEAD heaps utilised by this pipeline are first received at the XBEngine-level and uploaded to the GPU for processing.

The pipeline then performs GPU-accelerated processing on the uploaded data before packetising and sending the results back out onto the network.

Data processing occurs across three separate async methods. Data is passed between these methods using asyncio.Queues. See each method for more details.

Items passed between queues may still have GPU operations in progress. QueueItem provides mechanisms to wait for the in-progress work to complete.

Parameters:
  • outputs – Sequence of Output config for data product (BOutput or XOutput).

  • name – Name of Pipeline.

  • engine – The owning engine.

  • context – CUDA context for device work.

add_in_item(item: InQueueItem) None[source]

Append a newly-received InQueueItem to the _in_queue.

abstractmethod capture_enable(*, stream_id: int, enable: bool = True, timestamp: int = 0) None[source]

Enable/Disable the transmission of a data product’s stream.

abstractmethod async gpu_proc_loop() None[source]

Perform all GPU processing of received data in a continuous loop.

This method does the following:

  • Get an InQueueItem off the in_queue

  • Ensure it is not a NoneType value (indicating shutdown sequence)

  • await any outstanding events associated with the InQueueItem

  • Apply GPU processing to data in the InQueueItem

    • Bind input buffer(s) accordingly

  • Obtain a free QueueItem from the out_free_queue

    • Add event marker to wait for the proc_command_queue

    • Put the prepared QueueItem on the out_queue

NOTE: An initial QueueItem needs to be obtained from the out_free_queue for the first round of processing:

  • The gpu_proc_loop requires logic to decipher the timestamp of the first heap output.

  • It also provides an opportunity to bind buffers before processing is queued.

n_in_items = 3
n_out_items = 2
send_stream: Send
abstractmethod async sender_loop() None[source]

Send heaps to the network in a continuous loop.

This method does the following:

  • Get a QueueItem from the out_queue

  • Ensure it is not a NoneType value (indicating shutdown sequence)

  • Wait for events on the item to complete (likely GPU processing)

  • Wait for an available heap buffer from the send_stream

  • Asynchronously Transfer/Download GPU buffer data into the heap buffer in system RAM

    • Wait for the transfer to complete, before

  • Transmit heap buffer onto the network

  • Place the QueueItem back on the out_free_queue once complete

shutdown() None[source]

Start a graceful shutdown after the final call to add_in_item().

class katgpucbf.xbgpu.engine.XBEngine(*, katcp_host: str, katcp_port: int, adc_sample_rate: float, bandwidth: float, send_rate_factor: float, n_ants: int, n_channels: int, n_channels_per_substream: int, n_samples_between_spectra: int, n_spectra_per_heap: int, sample_bits: int, sync_time: float, channel_offset_value: int, outputs: list[~katgpucbf.xbgpu.output.Output], src: list[tuple[str, int]], recv_interface: str, recv_ibv: bool, recv_affinity: int, recv_comp_vector: int, recv_buffer: int, send_interface: str, send_buffer: int, send_ttl: int, send_ibv: bool, send_packet_payload: int, send_affinity: int, send_comp_vector: int, heaps_per_fengine_per_chunk: int, recv_reorder_tol: int, send_enabled: bool, monitor: ~katgpucbf.monitor.Monitor, context: ~katsdpsigproc.abc.AbstractContext, vkgdr_handle: <MagicMock id='140228911845920'>)[source]

Bases: Engine

GPU XB-Engine pipeline.

Currently the B-Engine functionality has not been added. This class currently only creates an X-Engine pipeline.

The XB-Engine conducts the reception of SPEAD heaps from F-engines and makes the data available to the constituent pipelines. In order to reduce the load on the main thread, received data is collected into chunks. A chunk consists of multiple batches of F-Engine heaps where a batch is a collection of heaps from all F-Engine with the same timestamp.

There is a seperate function for sending descriptors onto the network.

Class initialisers allocate all memory buffers to be used during the lifetime of the XBEngine object. These buffers are continuously reused to ensure memory use remains constrained.

Todo

A lot of the sensors are common to both the F- and X-engines. It may be worth investigating some kind of abstract base class for engines to build on top of.

Parameters:
  • katcp_host – Hostname or IP on which to listen for KATCP C&M connections.

  • katcp_port – Network port on which to listen for KATCP C&M connections.

  • adc_sample_rate – Sample rate of the digitisers in the current array (in Hz). This value is required to calculate the packet spacing of the output heaps. If it is set incorrectly, the packet spacing could be too large causing the pipeline to stall as heaps queue at the sender faster than they are sent.

  • bandwidth – Total bandwidth across n_channels channels (in Hz). This is used in delay calculations.

  • send_rate_factor – Configure the spead2 sender with a rate proportional to this factor. This value is intended to dictate a data transmission rate slightly higher/faster than the ADC rate. A factor of zero (0) tells the sender to transmit as fast as possible.

  • n_ants – The number of antennas to be correlated.

  • n_channels – The total number of frequency channels out of the F-Engine.

  • n_channels_per_substream – The number of frequency channels contained in the incoming F-engine data stream.

  • n_samples_between_spectra – The number of samples between frequency spectra received.

  • n_spectra_per_heap – The number of time samples received per frequency channel.

  • sample_bits – The number of bits per sample. Only 8 bits is supported at the moment.

  • sync_time – UNIX time corresponding to timestamp zero

  • channel_offset_value – The index of the first channel in the subset of channels processed by this XB-Engine. Used to set the value in the XB-Engine output heaps for spectrum reassembly by the downstream receiver.

  • outputs – Output streams to generate.

  • src – Endpoint for the incoming data.

  • recv_interface – IP address of the network device to use for input.

  • recv_ibv – Use ibverbs for input.

  • recv_affinity – Specific CPU core to assign the receive stream processing thread to.

  • recv_comp_vector – Completion vector for source stream, or -1 for polling. See spead2.recv.UdpIbvConfig for further information.

  • recv_buffer – The size of the network receive buffer.

  • heaps_per_fengine_per_chunk – The number of consecutive batches to store in the same chunk. The higher this value is, the more GPU and system RAM is allocated, the lower, the more work the Python processing thread is required to do.

  • recv_reorder_tol – Maximum tolerance for jitter between received packets, as a time expressed in ADC sample ticks.

  • send_interface – IP address of the network device to use for output.

  • send_ttl – TTL for outgoing packets.

  • send_ibv – Use ibverbs for output.

  • send_packet_payload – Size for output packets (payload only; headers and padding are added to this).

  • send_affinity – CPU core for output-handling thread.

  • send_comp_vector – Completion vector for transmission, or -1 for polling. See spead2.send.UdpIbvConfig for further information.

  • send_enabled – Start with correlator output transmission enabled, without having to issue a katcp command.

  • monitorMonitor to use for generating multiple Queue objects needed to communicate between functions, and handling basic reporting for Queue sizes and events.

  • context – Device context for katsdpsigproc. It must be a CUDA device.

  • vkgdr_handle – Handle to vkgdr for the same device as context.

VERSION: str = 'katgpucbf-xbgpu-1.0'
free_in_item(item: InQueueItem) None[source]

Return an InQueueItem to the free queue if its refcount hits zero.

async on_stop() None[source]

Shut down processing when the device server is stopped.

This is called by aiokatcp after closing the listening socket.

populate_sensors(sensors: SensorSet, recv_sensor_timeout: float) None[source]

Define the sensors for an XBEngine.

async request_beam_delays(ctx, stream_name: str, *delays: str) None[source]

Set the delays for all inputs of a given beam and update the sensor.

Parameters:
  • stream_name – The beam to modify

  • delays – A sequence of strings (one per input). Each string has the form delay:fringe-offset, where delay is a delay in seconds, and fringe-offset is the net phase adjustment at the centre frequency (of the whole stream, not of this engine).

async request_beam_quant_gains(ctx, stream_name: str, gain: float) None[source]

Set the quantisation gain for a beam and update the sensor.

Parameters:
  • stream_name – The beam to modify.

  • gain – The new gain to apply.

async request_beam_weights(ctx, stream_name: str, *weights: float) None[source]

Set the weights for all inputs of a given beam and update the sensor.

Parameters:
  • stream_name – The beam to modify

  • weights – A sequence of real floating-point values (one per input).

async request_capture_start(ctx, stream_name: str, timestamp: int = 0) None[source]

Start transmission of stream.

Parameters:
  • stream_name – Output stream name.

  • timestamp – Minimum ADC timestamp at which to enable transmission.

async request_capture_stop(ctx, stream_name: str) None[source]

Stop transmission of a stream.

Parameters:

stream_name – Output stream name.

async start(descriptor_interval_s: float = 5) None[source]

Start the engine.

This function adds the receive, processing and transmit tasks onto the event loop. It also adds a task to continuously send the descriptor heap at an interval indicated by descriptor_interval_s.

These functions will loop forever and only exit once the XBEngine receives a SIGINT or SIGTERM.

Parameters:

descriptor_interval_s – The interval used to dictate the ‘engine sleep interval’ between sending the data descriptor.

class katgpucbf.xbgpu.engine.XOutQueueItem(buffer_device: DeviceArray, saturated: DeviceArray, present_ants: ndarray, present_baselines: MappedArray, timestamp: int = 0)[source]

Bases: QueueItem

Extension of the QueueItem to track antennas that have missed data.

The XOutQueueItem between the gpu-proc and sender loops needs to carry a record of which antennas have missed data at any point in the accumulation being processed. This is used to determine whether any output products were affected, and have their data zeroed accordingly.

reset(timestamp: int = 0) None[source]

Reset the timestamp and present antenna tracker.

update_present_baselines() None[source]

Recompute present_baselines from present_ants.

class katgpucbf.xbgpu.engine.XPipeline(output: ~katgpucbf.xbgpu.output.XOutput, engine: ~katgpucbf.xbgpu.engine.XBEngine, context: ~katsdpsigproc.abc.AbstractContext, vkgdr_handle: <MagicMock id='140228911845920'>, init_send_enabled: bool, name: str = 'xpipeline')[source]

Bases: Pipeline[XOutput, XOutQueueItem]

Processing pipeline for a single baseline-correlation-products stream.

capture_enable(*, stream_id: int, enable: bool = True, timestamp: int = 0) None[source]

Enable/Disable the transmission of a data product’s stream.

async gpu_proc_loop() None[source]

Perform all GPU processing of received data in a continuous loop.

This method does the following:

  • Get an InQueueItem off the in_queue

  • Ensure it is not a NoneType value (indicating shutdown sequence)

  • await any outstanding events associated with the InQueueItem

  • Apply GPU processing to data in the InQueueItem

    • Bind input buffer(s) accordingly

  • Obtain a free QueueItem from the out_free_queue

    • Add event marker to wait for the proc_command_queue

    • Put the prepared QueueItem on the out_queue

NOTE: An initial QueueItem needs to be obtained from the out_free_queue for the first round of processing:

  • The gpu_proc_loop requires logic to decipher the timestamp of the first heap output.

  • It also provides an opportunity to bind buffers before processing is queued.

property output: XOutput

The single Output produced by this pipeline.

async sender_loop() None[source]

Send heaps to the network in a continuous loop.

This method does the following:

  • Get a QueueItem from the out_queue

  • Ensure it is not a NoneType value (indicating shutdown sequence)

  • Wait for events on the item to complete (likely GPU processing)

  • Wait for an available heap buffer from the send_stream

  • Asynchronously Transfer/Download GPU buffer data into the heap buffer in system RAM

    • Wait for the transfer to complete, before

  • Transmit heap buffer onto the network

  • Place the QueueItem back on the out_free_queue once complete