katgpucbf.xbgpu.engine module
Define the objects that implement an entire GPU XB-Engine pipeline.
The XBEngine comprises multiple Pipeline objects that facilitate output data
products. Additionally, several subclasses of QueueItem are defined
for passing data between parts of the engine.
- class katgpucbf.xbgpu.engine.BOutQueueItem(out: DeviceArray, saturated: DeviceArray, present: ndarray, weights: MappedArray, delays: MappedArray, timestamp: int = 0)[source]
Bases:
QueueItemTransmit queue item for the beamformer pipeline.
The out, saturated, rand_states, weights and delays must have the same shape and dtype as the corresponding slots in
Beamform. This class needs to carry a record of which antennas have missed data in the currentChunkof data. This is used to determine whether any beam data has been affected, and have their data flagged accordingly.- Parameters:
out – An int8 type
DeviceArraywith shape (batches, ants, channels, spectra_per_batch, N_POLS, COMPLEX).saturated – An uint32 type
DeviceArraywith shape (n_beams,).present – A boolean array with shape (heaps_per_fengine_per_chunk, n_ants).
weights – An np.complex64
MappedArraywith shape (n_ants, n_beams).delays – An np.float32
MappedArraywith shape (n_ants, n_beams).timestamp (int) – ADC sample count of the received Chunk.
todo:: (..) – Potentially create (yet another) base class for B- and XOutQueueItems.
- weights_version
Version of weights and delays (for comparison to BPipeline._weights_version)
- class katgpucbf.xbgpu.engine.BPipeline(outputs: ~collections.abc.Sequence[~katgpucbf.xbgpu.output.BOutput], engine: ~katgpucbf.xbgpu.engine.XBEngine, context: ~katsdpsigproc.abc.AbstractContext, vkgdr_handle: <MagicMock id='140228911845920'>, init_send_enabled: bool, name: str = 'bpipeline')[source]
Bases:
Pipeline[BOutput,BOutQueueItem]Processing pipeline for a collection of
output.BOutput.- capture_enable(*, stream_id: int, enable: bool = True, timestamp: int = 0) None[source]
Enable/Disable the transmission of a data product’s stream.
- async gpu_proc_loop() None[source]
Perform all GPU processing of received data in a continuous loop.
This method does the following:
Get an InQueueItem off the in_queue
Ensure it is not a NoneType value (indicating shutdown sequence)
await any outstanding events associated with the InQueueItem
Apply GPU processing to data in the InQueueItem
Bind input buffer(s) accordingly
Obtain a free QueueItem from the out_free_queue
Add event marker to wait for the proc_command_queue
Put the prepared QueueItem on the out_queue
NOTE: An initial QueueItem needs to be obtained from the out_free_queue for the first round of processing:
The gpu_proc_loop requires logic to decipher the timestamp of the first heap output.
It also provides an opportunity to bind buffers before processing is queued.
- async sender_loop() None[source]
Send heaps to the network in a continuous loop.
This method does the following:
Get a QueueItem from the out_queue
Ensure it is not a NoneType value (indicating shutdown sequence)
Wait for events on the item to complete (likely GPU processing)
Wait for an available heap buffer from the send_stream
Asynchronously Transfer/Download GPU buffer data into the heap buffer in system RAM
Wait for the transfer to complete, before
Transmit heap buffer onto the network
Place the QueueItem back on the out_free_queue once complete
- set_delays(stream_id: int, delays: ndarray) int[source]
Set the beam steering delays for one beam.
- Parameters:
stream_id – The index of the beam whose quantisation gain is being set.
delays – A 2D array of delay coefficients. The first axis corresponds to inputs. The second axis has length two, with the first element containing the delay in seconds, and the second containing a channel-independent phase rotation in radians.
- Returns:
_weights_steady, the timestamp that includes the effect of a beam-delay update made now.- Return type:
- set_quant_gain(stream_id: int, gain: float) int[source]
Set the quantisation gain for one beam.
- Parameters:
stream_id – The index of the beam whose quantisation gain is being set.
gain – Real-valued quantisation gain.
- Returns:
_weights_steady, the timestamp that includes the effect of a beam-quant-gain update made now.- Return type:
- set_weights(stream_id: int, weights: ndarray) int[source]
Set the beam weights for one beam.
- Parameters:
stream_id – The index of the beam whose weights are being set.
weights – A 1D array containing real-valued weights (per input).
- Returns:
_weights_steady, the timestamp that includes the effect of a beam-weights update made now.- Return type:
- class katgpucbf.xbgpu.engine.InQueueItem(buffer_device: DeviceArray, present: ndarray, timestamp: int = 0)[source]
Bases:
QueueItemExtension of the QueueItem for use in receive queues.
The InQueueItem holds a reference to the received Chunk because its data is copied into the GPU buffer asynchronously. Once the GPU processing loop is done with the Chunk’s data, it hands it back to the receiving loop to reuse the resource. It is for this reason that the Chunk’s heap presence is stored separately.
The InQueueItem also holds a reference count of the number of pipelines still using this item. This is to ensure the item isn’t freed before each pipeline has had a chance to use the received data.
- class katgpucbf.xbgpu.engine.Pipeline(outputs: Sequence, name: str, engine: XBEngine, context: AbstractContext)[source]
Bases:
GenericBase Pipeline class to build on.
SPEAD heaps utilised by this pipeline are first received at the
XBEngine-level and uploaded to the GPU for processing.The pipeline then performs GPU-accelerated processing on the uploaded data before packetising and sending the results back out onto the network.
Data processing occurs across three separate async methods. Data is passed between these methods using
asyncio.Queues. See each method for more details.XBEngine._receiver_loop(),
Items passed between queues may still have GPU operations in progress.
QueueItemprovides mechanisms to wait for the in-progress work to complete.- Parameters:
outputs – Sequence of Output config for data product (BOutput or XOutput).
name – Name of Pipeline.
engine – The owning engine.
context – CUDA context for device work.
- add_in_item(item: InQueueItem) None[source]
Append a newly-received
InQueueItemto the_in_queue.
- abstractmethod capture_enable(*, stream_id: int, enable: bool = True, timestamp: int = 0) None[source]
Enable/Disable the transmission of a data product’s stream.
- abstractmethod async gpu_proc_loop() None[source]
Perform all GPU processing of received data in a continuous loop.
This method does the following:
Get an InQueueItem off the in_queue
Ensure it is not a NoneType value (indicating shutdown sequence)
await any outstanding events associated with the InQueueItem
Apply GPU processing to data in the InQueueItem
Bind input buffer(s) accordingly
Obtain a free QueueItem from the out_free_queue
Add event marker to wait for the proc_command_queue
Put the prepared QueueItem on the out_queue
NOTE: An initial QueueItem needs to be obtained from the out_free_queue for the first round of processing:
The gpu_proc_loop requires logic to decipher the timestamp of the first heap output.
It also provides an opportunity to bind buffers before processing is queued.
- n_in_items = 3
- n_out_items = 2
- abstractmethod async sender_loop() None[source]
Send heaps to the network in a continuous loop.
This method does the following:
Get a QueueItem from the out_queue
Ensure it is not a NoneType value (indicating shutdown sequence)
Wait for events on the item to complete (likely GPU processing)
Wait for an available heap buffer from the send_stream
Asynchronously Transfer/Download GPU buffer data into the heap buffer in system RAM
Wait for the transfer to complete, before
Transmit heap buffer onto the network
Place the QueueItem back on the out_free_queue once complete
- shutdown() None[source]
Start a graceful shutdown after the final call to
add_in_item().
- class katgpucbf.xbgpu.engine.XBEngine(*, katcp_host: str, katcp_port: int, adc_sample_rate: float, bandwidth: float, send_rate_factor: float, n_ants: int, n_channels: int, n_channels_per_substream: int, n_samples_between_spectra: int, n_spectra_per_heap: int, sample_bits: int, sync_time: float, channel_offset_value: int, outputs: list[~katgpucbf.xbgpu.output.Output], src: list[tuple[str, int]], recv_interface: str, recv_ibv: bool, recv_affinity: int, recv_comp_vector: int, recv_buffer: int, send_interface: str, send_buffer: int, send_ttl: int, send_ibv: bool, send_packet_payload: int, send_affinity: int, send_comp_vector: int, heaps_per_fengine_per_chunk: int, recv_reorder_tol: int, send_enabled: bool, monitor: ~katgpucbf.monitor.Monitor, context: ~katsdpsigproc.abc.AbstractContext, vkgdr_handle: <MagicMock id='140228911845920'>)[source]
Bases:
EngineGPU XB-Engine pipeline.
Currently the B-Engine functionality has not been added. This class currently only creates an X-Engine pipeline.
The XB-Engine conducts the reception of SPEAD heaps from F-engines and makes the data available to the constituent pipelines. In order to reduce the load on the main thread, received data is collected into chunks. A chunk consists of multiple batches of F-Engine heaps where a batch is a collection of heaps from all F-Engine with the same timestamp.
There is a seperate function for sending descriptors onto the network.
Class initialisers allocate all memory buffers to be used during the lifetime of the XBEngine object. These buffers are continuously reused to ensure memory use remains constrained.
Todo
A lot of the sensors are common to both the F- and X-engines. It may be worth investigating some kind of abstract base class for engines to build on top of.
- Parameters:
katcp_host – Hostname or IP on which to listen for KATCP C&M connections.
katcp_port – Network port on which to listen for KATCP C&M connections.
adc_sample_rate – Sample rate of the digitisers in the current array (in Hz). This value is required to calculate the packet spacing of the output heaps. If it is set incorrectly, the packet spacing could be too large causing the pipeline to stall as heaps queue at the sender faster than they are sent.
bandwidth – Total bandwidth across n_channels channels (in Hz). This is used in delay calculations.
send_rate_factor – Configure the spead2 sender with a rate proportional to this factor. This value is intended to dictate a data transmission rate slightly higher/faster than the ADC rate. A factor of zero (0) tells the sender to transmit as fast as possible.
n_ants – The number of antennas to be correlated.
n_channels – The total number of frequency channels out of the F-Engine.
n_channels_per_substream – The number of frequency channels contained in the incoming F-engine data stream.
n_samples_between_spectra – The number of samples between frequency spectra received.
n_spectra_per_heap – The number of time samples received per frequency channel.
sample_bits – The number of bits per sample. Only 8 bits is supported at the moment.
sync_time – UNIX time corresponding to timestamp zero
channel_offset_value – The index of the first channel in the subset of channels processed by this XB-Engine. Used to set the value in the XB-Engine output heaps for spectrum reassembly by the downstream receiver.
outputs – Output streams to generate.
src – Endpoint for the incoming data.
recv_interface – IP address of the network device to use for input.
recv_ibv – Use ibverbs for input.
recv_affinity – Specific CPU core to assign the receive stream processing thread to.
recv_comp_vector – Completion vector for source stream, or -1 for polling. See
spead2.recv.UdpIbvConfigfor further information.recv_buffer – The size of the network receive buffer.
heaps_per_fengine_per_chunk – The number of consecutive batches to store in the same chunk. The higher this value is, the more GPU and system RAM is allocated, the lower, the more work the Python processing thread is required to do.
recv_reorder_tol – Maximum tolerance for jitter between received packets, as a time expressed in ADC sample ticks.
send_interface – IP address of the network device to use for output.
send_ttl – TTL for outgoing packets.
send_ibv – Use ibverbs for output.
send_packet_payload – Size for output packets (payload only; headers and padding are added to this).
send_affinity – CPU core for output-handling thread.
send_comp_vector – Completion vector for transmission, or -1 for polling. See
spead2.send.UdpIbvConfigfor further information.send_enabled – Start with correlator output transmission enabled, without having to issue a katcp command.
monitor –
Monitorto use for generating multipleQueueobjects needed to communicate between functions, and handling basic reporting forQueuesizes and events.context – Device context for katsdpsigproc. It must be a CUDA device.
vkgdr_handle – Handle to vkgdr for the same device as context.
- free_in_item(item: InQueueItem) None[source]
Return an InQueueItem to the free queue if its refcount hits zero.
- async on_stop() None[source]
Shut down processing when the device server is stopped.
This is called by aiokatcp after closing the listening socket.
- populate_sensors(sensors: SensorSet, recv_sensor_timeout: float) None[source]
Define the sensors for an XBEngine.
- async request_beam_delays(ctx, stream_name: str, *delays: str) None[source]
Set the delays for all inputs of a given beam and update the sensor.
- Parameters:
stream_name – The beam to modify
delays – A sequence of strings (one per input). Each string has the form
delay:fringe-offset, wheredelayis a delay in seconds, andfringe-offsetis the net phase adjustment at the centre frequency (of the whole stream, not of this engine).
- async request_beam_quant_gains(ctx, stream_name: str, gain: float) None[source]
Set the quantisation gain for a beam and update the sensor.
- Parameters:
stream_name – The beam to modify.
gain – The new gain to apply.
- async request_beam_weights(ctx, stream_name: str, *weights: float) None[source]
Set the weights for all inputs of a given beam and update the sensor.
- Parameters:
stream_name – The beam to modify
weights – A sequence of real floating-point values (one per input).
- async request_capture_start(ctx, stream_name: str, timestamp: int = 0) None[source]
Start transmission of stream.
- Parameters:
stream_name – Output stream name.
timestamp – Minimum ADC timestamp at which to enable transmission.
- async request_capture_stop(ctx, stream_name: str) None[source]
Stop transmission of a stream.
- Parameters:
stream_name – Output stream name.
- async start(descriptor_interval_s: float = 5) None[source]
Start the engine.
This function adds the receive, processing and transmit tasks onto the event loop. It also adds a task to continuously send the descriptor heap at an interval indicated by descriptor_interval_s.
These functions will loop forever and only exit once the XBEngine receives a SIGINT or SIGTERM.
- Parameters:
descriptor_interval_s – The interval used to dictate the ‘engine sleep interval’ between sending the data descriptor.
- class katgpucbf.xbgpu.engine.XOutQueueItem(buffer_device: DeviceArray, saturated: DeviceArray, present_ants: ndarray, present_baselines: MappedArray, timestamp: int = 0)[source]
Bases:
QueueItemExtension of the QueueItem to track antennas that have missed data.
The XOutQueueItem between the gpu-proc and sender loops needs to carry a record of which antennas have missed data at any point in the accumulation being processed. This is used to determine whether any output products were affected, and have their data zeroed accordingly.
- class katgpucbf.xbgpu.engine.XPipeline(output: ~katgpucbf.xbgpu.output.XOutput, engine: ~katgpucbf.xbgpu.engine.XBEngine, context: ~katsdpsigproc.abc.AbstractContext, vkgdr_handle: <MagicMock id='140228911845920'>, init_send_enabled: bool, name: str = 'xpipeline')[source]
Bases:
Pipeline[XOutput,XOutQueueItem]Processing pipeline for a single baseline-correlation-products stream.
- capture_enable(*, stream_id: int, enable: bool = True, timestamp: int = 0) None[source]
Enable/Disable the transmission of a data product’s stream.
- async gpu_proc_loop() None[source]
Perform all GPU processing of received data in a continuous loop.
This method does the following:
Get an InQueueItem off the in_queue
Ensure it is not a NoneType value (indicating shutdown sequence)
await any outstanding events associated with the InQueueItem
Apply GPU processing to data in the InQueueItem
Bind input buffer(s) accordingly
Obtain a free QueueItem from the out_free_queue
Add event marker to wait for the proc_command_queue
Put the prepared QueueItem on the out_queue
NOTE: An initial QueueItem needs to be obtained from the out_free_queue for the first round of processing:
The gpu_proc_loop requires logic to decipher the timestamp of the first heap output.
It also provides an opportunity to bind buffers before processing is queued.
- async sender_loop() None[source]
Send heaps to the network in a continuous loop.
This method does the following:
Get a QueueItem from the out_queue
Ensure it is not a NoneType value (indicating shutdown sequence)
Wait for events on the item to complete (likely GPU processing)
Wait for an available heap buffer from the send_stream
Asynchronously Transfer/Download GPU buffer data into the heap buffer in system RAM
Wait for the transfer to complete, before
Transmit heap buffer onto the network
Place the QueueItem back on the out_free_queue once complete