katgpucbf.fgpu.engine module

FEngine class, which combines all the processing steps for a single digitiser data stream.

class katgpucbf.fgpu.engine.FEngine(*, katcp_host: str, katcp_port: int, context: ~katsdpsigproc.abc.AbstractContext, vkgdr_handle: <MagicMock id='140228911845920'>, srcs: str | list[tuple[str, int]], recv_interface: list[str] | None, recv_ibv: bool, recv_affinity: list[int], recv_comp_vector: list[int], recv_packet_samples: int, recv_buffer: int, send_interface: list[str], send_ttl: int, send_ibv: bool, send_packet_payload: int, send_affinity: int, send_comp_vector: int, send_buffer: int, outputs: list[~katgpucbf.fgpu.output.Output], adc_sample_rate: float, send_rate_factor: float, feng_id: int, n_ants: int, chunk_samples: int, chunk_jones: int, dig_sample_bits: int, send_sample_bits: int, max_delay_diff: int, gain: complex, sync_time: float, mask_timestamp: bool, use_vkgdr: bool, use_peerdirect: bool, monitor: ~katgpucbf.monitor.Monitor)[source]

Bases: Engine

Top-level class running the whole thing.

Parameters:
  • katcp_host – Hostname or IP on which to listen for KATCP C&M connections.

  • katcp_port – Network port on which to listen for KATCP C&M connections.

  • context – The accelerator (CUDA) context to use for running the FEngine.

  • vkgdr_handle – Handle to vkgdr for the same device as context.

  • srcs – A list of source endpoints for the incoming data, or a pcap filename.

  • recv_interface – IP addresses of the network devices to use for input.

  • recv_ibv – Use ibverbs for input.

  • recv_affinity – List of CPU cores for input-handling threads. Must be one number per pol.

  • recv_comp_vector – Completion vectors for source streams, or -1 for polling. See spead2.recv.UdpIbvConfig for further information.

  • recv_packet_samples – The number of samples per digitiser packet.

  • recv_buffer – The size of the network receive buffer.

  • send_interface – IP addresses of the network devices to use for output.

  • send_ttl – TTL for outgoing packets.

  • send_ibv – Use ibverbs for output.

  • send_packet_payload – Size for output packets (voltage payload only, headers and padding are added to this).

  • send_affinity – CPU core for output-handling thread.

  • send_comp_vector – Completion vector for transmission, or -1 for polling. See spead2.send.UdpIbvConfig for further information.

  • send_buffer – Size of the network send buffer.

  • outputs – Output streams to generate.

  • adc_sample_rate – Digitiser sampling rate (in Hz), used to determine transmission rate.

  • send_rate_factor – Configure the SPEAD2 sender with a rate proportional to this factor. This value is intended to dictate a data transmission rate slightly higher/faster than the ADC rate. NOTE: - A factor of zero (0) tells the sender to transmit as fast as possible.

  • feng_id – ID of the F-engine indicating which one in the array this is. Included in the output heaps so that the X-engine can determine where the data fits in.

  • n_ants – The number of antennas in the array. Used for numbering heaps so as not to collide with other antennas transmitting to the same X-engine.

  • chunk_samples – Number of samples in each input chunk, excluding padding samples.

  • chunk_jones – Number of Jones vectors in each output chunk.

  • max_delay_diff – Maximum supported difference between delays across polarisations (in samples).

  • gain – Initial eq gain for all channels.

  • sync_time – UNIX time at which the digitisers were synced.

  • mask_timestamp – Mask off bottom bits of timestamp (workaround for broken digitiser).

  • use_vkgdr – Assemble chunks directly in GPU memory (requires Vulkan).

  • use_peerdirect – Send chunks directly from GPU memory (requires supported GPU).

  • monitorMonitor to use for generating multiple Queue objects needed to communicate between functions, and handling basic reporting for Queue sizes and events.

VERSION: str = 'katgpucbf-fgpu-1.0'
free_in_item(item: InQueueItem) None[source]

Return an InQueueItem to the free queue if its refcount hits zero.

make_send_streams(output: Output, n_data_heaps: int, chunks: Sequence[Chunk]) list[spead2.send.asyncio.AsyncStream][source]

Create send streams for a pipeline.

This method should only be called by Pipeline.

async on_stop() None[source]

Shut down processing when the device server is stopped.

This is called by aiokatcp after closing the listening socket. Also handle any Exceptions thrown unexpectedly in any of the processing loops.

async request_delays(ctx, stream_name: str, start_time: Timestamp, *delays: str) None[source]

Add a new first-order polynomial to the delay and fringe correction model.

Todo

Make the request’s fail replies more informative in the case of malformed requests.

async request_gain(ctx, stream_name: str, input: int, *values: str) tuple[str, ...][source]

Set or query the eq gains.

If no values are provided, the gains are simply returned.

Parameters:
  • stream_name – Output stream name

  • input – Input number (0 or 1)

  • values – Complex values. There must either be a single value (used for all channels), or a value per channel.

async request_gain_all(ctx, stream_name: str, *values: str) None[source]

Set the eq gains for all inputs.

Parameters:
  • stream_name – Output stream name

  • values – Complex values. There must either be a single value (used for all channels), or a value per channel, or "default" to reset gains to the default.

async start(descriptor_interval_s: float = 5) None[source]

Start the engine.

This function adds the receive, processing and transmit tasks onto the event loop. It also adds a task to continuously send the descriptor heaps at an interval based on the descriptor_interval_s. See _run_descriptors_loop() for more details.

Parameters:

descriptor_interval_s – The base interval used as a multiplier on feng_id and n_ants to dictate the initial ‘engine sleep interval’ and ‘send interval’ respectively.

class katgpucbf.fgpu.engine.InQueueItem(context: AbstractContext, layout: Layout, n_samples: int, timestamp: int = 0, *, use_vkgdr: bool = False)[source]

Bases: QueueItem

Item for use in input queues.

This Item references GPU memory regions for input samples from both polarisations, with metadata describing their dimensions (number of samples and bitwidth of samples) in addition to the features of QueueItem.

An example of usage is as follows:

# In the receive function
my_in_item.samples.set_region(...)  # start copying sample data to the GPU,
my_in_item.add_marker(command_queue)
self._in_queue.put_nowait(my_in_item)
...
# in the processing function
next_in_item = await self._in_queue.get() # get the item from the queue
next_in_item.enqueue_wait_for_events(command_queue) # wait for its data to be completely copied
... # carry on executing kernels or whatever needs to be done with the data
Parameters:
  • context – CUDA context in which to allocate memory.

  • layout – Layout of the source stream.

  • n_samples (int) – Number of digitised samples to hold, per polarisation

  • timestamp (int) – Timestamp of the oldest digitiser sample represented in the data.

  • use_vkgdr – Use vkgdr to write sample data directly to the GPU rather than staging in host memory.

property capacity: int

Memory capacity in samples.

The amount of space allocated to each polarisation stored in samples.

chunk: Chunk | None = None

Chunk to return to recv after processing (used with vkgdr only).

dig_sample_bits: int

Bitwidth of the data in samples.

property end_timestamp: int

Past-the-end (i.e. latest plus 1) timestamp of the item.

n_samples: int

Number of samples for each polarisation in samples.

present: ndarray

Bitmask indicating which packets were present in the chunk.

present_cumsum: ndarray

Cumulative sum over present (separately per pol). It is up to the caller to compute it at the appropriate time.

refcount: int

Number of pipelines still using this item.

reset(timestamp: int = 0) None[source]

Reset the item.

Zero the timestamp, empty the event list and set number of samples to zero.

samples: DeviceArray | None

A device memory region for storing the raw samples.

class katgpucbf.fgpu.engine.OutQueueItem(vkgdr_handle: <MagicMock id='140228911845920'>, compute: ~katgpucbf.fgpu.compute.Compute, spectra_samples: int, timestamp: int = 0)[source]

Bases: QueueItem

Item for use in output queues.

This Item references GPU memory regions for output spectra from both polarisations, with something about the fine delay, in addition to the features of QueueItem.

An example of usage is as follows:

# In the processing function
compute.run_some_dsp(my_out_item.spectra) # Run the DSP, whatever it is.
my_out_item.add_marker(command_queue)
self._out_queue.put_nowait(my_out_item)
...
# in the transmit function
next_out_item = await self._out_queue.get() # get the item from the queue
next_out_item.enqueue_wait_for_events(download_queue) # wait for event indicating DSP is finished
next_out_item.get_async(download_queue) # Start copying data back to the host
... # Normally you'd put a marker on the queue again so that you know when the
    # copy is finished, but this needn't be attached to the item unless
    # there's another queue afterwards.
Parameters:
  • compute – F-engine Operation Sequence detailing the DSP happening on the data, including details for buffers, context, shapes, slots, etc.

  • spectra_samples (int) – Number of ADC samples between spectra.

  • timestamp (int) – Timestamp of the first spectrum in the OutQueueItem.

property capacity: int

Number of spectra stored in memory for each polarisation.

property channels: int

Number of channels stored in the item.

chunk: Chunk | None = None

Corresponding chunk for transmission (only used in PeerDirect mode).

dig_total_power: DeviceArray | None

Output sum of squared samples, per pol

property end_timestamp: int

Past-the-end timestamp of the item.

Following Python’s normal exclusive-end convention.

fine_delay: MappedArray

Per-spectrum fine delays

gains: MappedArray

Per-channel gains

gains_version: int

Gain version number matching gains (for comparison to Pipeline.gains_version)

n_spectra: int

Number of spectra contained in spectra.

property next_timestamp: int

Timestamp of the next OutQueueItem after this one.

phase: MappedArray

Per-spectrum phase offsets

property pols: int

Number of polarisations.

present: ndarray

Bit-mask indicating which spectra contain valid data and should be transmitted.

reset(timestamp: int = 0) None[source]

Reset the item.

Zero the item’s timestamp, empty the event list and set number of spectra to zero.

This does not zero the dig_total_power counters. Use reset_all() for that.

reset_all(command_queue: AbstractCommandQueue, timestamp: int = 0) None[source]

Fully reset the item.

In addition to the work done by reset(), zero out GPU accumulators, using the given command queue. No events are added associated with this; it is assumed that the same command queue will be used to subsequently operate on the accumulators.

saturated: DeviceArray

Output saturation count, per pol

spectra: DeviceArray

Output data, a collection of spectra, arranged in memory by pol and by heap.

spectra_samples: int

Number of ADC samples between spectra

class katgpucbf.fgpu.engine.Pipeline(output: Output, engine: FEngine, context: AbstractContext, dig_stats: bool)[source]

Bases: object

Processing pipeline for a single output stream.

Parameters:
  • output – The output stream to produce

  • engine – The owning engine

  • context – CUDA context for device work

  • dig_stats – If true, this pipeline is responsible for producing the digitiser statistics such as dig-rms-dbfs.

add_in_item(item: InQueueItem) None[source]

Append a newly-received InQueueItem.

delay_update_timestamp() int[source]

Return a timestamp by which an update to the delay model will take effect.

async run_processing() None[source]

Do the hard work of the F-engine.

This function takes place entirely on the GPU. Coarse delay happens. Then a batch FFT operation is applied, and finally, fine-delay, phase correction, quantisation and corner-turn are performed.

async run_transmit() None[source]

Get the processed data from the GPU to the Network.

This could be done either with or without PeerDirect. In the non-PeerDirect case, OutQueueItem objects are pulled from the _out_queue. We wait for the events that mark the end of the processing, then copy the data to host memory before turning it over to the sender for transmission on the network. The “empty” item is then returned to run_processing() via the _out_free_queue, and once the chunk has been transmitted it is returned to _send_free_queue.

In the PeerDirect case, the item and the chunk are bound together and share memory. In this case _send_free_queue is unused. The item is only returned to _out_free_queue once it has been fully transmitted.

set_gains(input: int, gains: ndarray) None[source]

Set the eq gains for one polarisation and update the sensor.

The gains must contain one entry per channel; the shortcut of supplying a single value is handled by request_gain().

shutdown() None[source]

Start graceful shutdown after the final call to add_in_item().

property spectra: int

Number of spectra per output chunk.

static update_delay_sensor(delay_models: Sequence[LinearDelayModel], *, delay_sensor: Sensor, adc_sample_rate: float) None[source]

Update the delay sensor upon loading of a new model.

Accepting the delay_models as a read-only Sequence from the MultiDelayModel, even though we only need the first one to update the sensor.

The delay and phase-rate values need to be scaled back to their original values (delay (s), phase-rate (rad/s)).

katgpucbf.fgpu.engine.dig_rms_dbfs_status(value: float) Status[source]

Compute status for dig-rms-dbfs sensor.

katgpucbf.fgpu.engine.format_complex(value: Complex) str[source]

Format a complex number for a katcp request.

The ICD specifies that complex numbers have the format real+imaginary j. Python’s default formatting contains parentheses if real is non-zero, and omits the real part if it is zero. For a numpy value it also only includes enough significant figures for the dtype, which means that reading it back as a Python complex may not give exactly the same value.

katgpucbf.fgpu.engine.generate_ddc_weights(output: NarrowbandOutput, adc_sample_rate: float) ndarray[source]

Generate filter weights for the narrowband low-pass filter.

katgpucbf.fgpu.engine.generate_pfb_weights(step: int, taps: int, w_cutoff: float, window_function: WindowFunction) ndarray[source]

Generate Hann-window weights for the F-engine’s PFB-FIR.

The resulting weights are normalised such that the sum of squares is 1.

Parameters:
  • step – Number of samples per spectrum.

  • taps – Number of taps in the PFB-FIR.

  • w_cutoff – Scaling factor for the width of the channel response.

  • window_function – Window function to use.

Returns:

Array containing the weights for the PFB-FIR filters, as single-precision floats.

Return type:

numpy.ndarray