katgpucbf.fgpu.engine module
FEngine class, which combines all the processing steps for a single digitiser data stream.
- class katgpucbf.fgpu.engine.FEngine(*, katcp_host: str, katcp_port: int, context: ~katsdpsigproc.abc.AbstractContext, vkgdr_handle: <MagicMock id='140228911845920'>, srcs: str | list[tuple[str, int]], recv_interface: list[str] | None, recv_ibv: bool, recv_affinity: list[int], recv_comp_vector: list[int], recv_packet_samples: int, recv_buffer: int, send_interface: list[str], send_ttl: int, send_ibv: bool, send_packet_payload: int, send_affinity: int, send_comp_vector: int, send_buffer: int, outputs: list[~katgpucbf.fgpu.output.Output], adc_sample_rate: float, send_rate_factor: float, feng_id: int, n_ants: int, chunk_samples: int, chunk_jones: int, dig_sample_bits: int, send_sample_bits: int, max_delay_diff: int, gain: complex, sync_time: float, mask_timestamp: bool, use_vkgdr: bool, use_peerdirect: bool, monitor: ~katgpucbf.monitor.Monitor)[source]
Bases:
EngineTop-level class running the whole thing.
- Parameters:
katcp_host – Hostname or IP on which to listen for KATCP C&M connections.
katcp_port – Network port on which to listen for KATCP C&M connections.
context – The accelerator (CUDA) context to use for running the FEngine.
vkgdr_handle – Handle to vkgdr for the same device as context.
srcs – A list of source endpoints for the incoming data, or a pcap filename.
recv_interface – IP addresses of the network devices to use for input.
recv_ibv – Use ibverbs for input.
recv_affinity – List of CPU cores for input-handling threads. Must be one number per pol.
recv_comp_vector – Completion vectors for source streams, or -1 for polling. See
spead2.recv.UdpIbvConfigfor further information.recv_packet_samples – The number of samples per digitiser packet.
recv_buffer – The size of the network receive buffer.
send_interface – IP addresses of the network devices to use for output.
send_ttl – TTL for outgoing packets.
send_ibv – Use ibverbs for output.
send_packet_payload – Size for output packets (voltage payload only, headers and padding are added to this).
send_affinity – CPU core for output-handling thread.
send_comp_vector – Completion vector for transmission, or -1 for polling. See
spead2.send.UdpIbvConfigfor further information.send_buffer – Size of the network send buffer.
outputs – Output streams to generate.
adc_sample_rate – Digitiser sampling rate (in Hz), used to determine transmission rate.
send_rate_factor – Configure the SPEAD2 sender with a rate proportional to this factor. This value is intended to dictate a data transmission rate slightly higher/faster than the ADC rate. NOTE: - A factor of zero (0) tells the sender to transmit as fast as possible.
feng_id – ID of the F-engine indicating which one in the array this is. Included in the output heaps so that the X-engine can determine where the data fits in.
n_ants – The number of antennas in the array. Used for numbering heaps so as not to collide with other antennas transmitting to the same X-engine.
chunk_samples – Number of samples in each input chunk, excluding padding samples.
chunk_jones – Number of Jones vectors in each output chunk.
max_delay_diff – Maximum supported difference between delays across polarisations (in samples).
gain – Initial eq gain for all channels.
sync_time – UNIX time at which the digitisers were synced.
mask_timestamp – Mask off bottom bits of timestamp (workaround for broken digitiser).
use_vkgdr – Assemble chunks directly in GPU memory (requires Vulkan).
use_peerdirect – Send chunks directly from GPU memory (requires supported GPU).
monitor –
Monitorto use for generating multipleQueueobjects needed to communicate between functions, and handling basic reporting forQueuesizes and events.
- free_in_item(item: InQueueItem) None[source]
Return an InQueueItem to the free queue if its refcount hits zero.
- make_send_streams(output: Output, n_data_heaps: int, chunks: Sequence[Chunk]) list[spead2.send.asyncio.AsyncStream][source]
Create send streams for a pipeline.
This method should only be called by
Pipeline.
- async on_stop() None[source]
Shut down processing when the device server is stopped.
This is called by aiokatcp after closing the listening socket. Also handle any Exceptions thrown unexpectedly in any of the processing loops.
- async request_delays(ctx, stream_name: str, start_time: Timestamp, *delays: str) None[source]
Add a new first-order polynomial to the delay and fringe correction model.
Todo
Make the request’s fail replies more informative in the case of malformed requests.
- async request_gain(ctx, stream_name: str, input: int, *values: str) tuple[str, ...][source]
Set or query the eq gains.
If no values are provided, the gains are simply returned.
- Parameters:
stream_name – Output stream name
input – Input number (0 or 1)
values – Complex values. There must either be a single value (used for all channels), or a value per channel.
- async request_gain_all(ctx, stream_name: str, *values: str) None[source]
Set the eq gains for all inputs.
- Parameters:
stream_name – Output stream name
values – Complex values. There must either be a single value (used for all channels), or a value per channel, or
"default"to reset gains to the default.
- async start(descriptor_interval_s: float = 5) None[source]
Start the engine.
This function adds the receive, processing and transmit tasks onto the event loop. It also adds a task to continuously send the descriptor heaps at an interval based on the descriptor_interval_s. See
_run_descriptors_loop()for more details.- Parameters:
descriptor_interval_s – The base interval used as a multiplier on feng_id and n_ants to dictate the initial ‘engine sleep interval’ and ‘send interval’ respectively.
- class katgpucbf.fgpu.engine.InQueueItem(context: AbstractContext, layout: Layout, n_samples: int, timestamp: int = 0, *, use_vkgdr: bool = False)[source]
Bases:
QueueItemItem for use in input queues.
This Item references GPU memory regions for input samples from both polarisations, with metadata describing their dimensions (number of samples and bitwidth of samples) in addition to the features of
QueueItem.An example of usage is as follows:
# In the receive function my_in_item.samples.set_region(...) # start copying sample data to the GPU, my_in_item.add_marker(command_queue) self._in_queue.put_nowait(my_in_item) ... # in the processing function next_in_item = await self._in_queue.get() # get the item from the queue next_in_item.enqueue_wait_for_events(command_queue) # wait for its data to be completely copied ... # carry on executing kernels or whatever needs to be done with the data
- Parameters:
context – CUDA context in which to allocate memory.
layout – Layout of the source stream.
n_samples (int) – Number of digitised samples to hold, per polarisation
timestamp (int) – Timestamp of the oldest digitiser sample represented in the data.
use_vkgdr – Use vkgdr to write sample data directly to the GPU rather than staging in host memory.
- property capacity: int
Memory capacity in samples.
The amount of space allocated to each polarisation stored in
samples.
- present_cumsum: ndarray
Cumulative sum over
present(separately per pol). It is up to the caller to compute it at the appropriate time.
- reset(timestamp: int = 0) None[source]
Reset the item.
Zero the timestamp, empty the event list and set number of samples to zero.
- samples: DeviceArray | None
A device memory region for storing the raw samples.
- class katgpucbf.fgpu.engine.OutQueueItem(vkgdr_handle: <MagicMock id='140228911845920'>, compute: ~katgpucbf.fgpu.compute.Compute, spectra_samples: int, timestamp: int = 0)[source]
Bases:
QueueItemItem for use in output queues.
This Item references GPU memory regions for output spectra from both polarisations, with something about the fine delay, in addition to the features of
QueueItem.An example of usage is as follows:
# In the processing function compute.run_some_dsp(my_out_item.spectra) # Run the DSP, whatever it is. my_out_item.add_marker(command_queue) self._out_queue.put_nowait(my_out_item) ... # in the transmit function next_out_item = await self._out_queue.get() # get the item from the queue next_out_item.enqueue_wait_for_events(download_queue) # wait for event indicating DSP is finished next_out_item.get_async(download_queue) # Start copying data back to the host ... # Normally you'd put a marker on the queue again so that you know when the # copy is finished, but this needn't be attached to the item unless # there's another queue afterwards.
- Parameters:
compute – F-engine Operation Sequence detailing the DSP happening on the data, including details for buffers, context, shapes, slots, etc.
spectra_samples (int) – Number of ADC samples between spectra.
timestamp (int) – Timestamp of the first spectrum in the
OutQueueItem.
- dig_total_power: DeviceArray | None
Output sum of squared samples, per pol
- property end_timestamp: int
Past-the-end timestamp of the item.
Following Python’s normal exclusive-end convention.
- fine_delay: MappedArray
Per-spectrum fine delays
- gains: MappedArray
Per-channel gains
- property next_timestamp: int
Timestamp of the next
OutQueueItemafter this one.
- phase: MappedArray
Per-spectrum phase offsets
- reset(timestamp: int = 0) None[source]
Reset the item.
Zero the item’s timestamp, empty the event list and set number of spectra to zero.
This does not zero the dig_total_power counters. Use
reset_all()for that.
- reset_all(command_queue: AbstractCommandQueue, timestamp: int = 0) None[source]
Fully reset the item.
In addition to the work done by
reset(), zero out GPU accumulators, using the given command queue. No events are added associated with this; it is assumed that the same command queue will be used to subsequently operate on the accumulators.
- saturated: DeviceArray
Output saturation count, per pol
- spectra: DeviceArray
Output data, a collection of spectra, arranged in memory by pol and by heap.
- class katgpucbf.fgpu.engine.Pipeline(output: Output, engine: FEngine, context: AbstractContext, dig_stats: bool)[source]
Bases:
objectProcessing pipeline for a single output stream.
- Parameters:
output – The output stream to produce
engine – The owning engine
context – CUDA context for device work
dig_stats – If true, this pipeline is responsible for producing the digitiser statistics such as dig-rms-dbfs.
- add_in_item(item: InQueueItem) None[source]
Append a newly-received
InQueueItem.
- delay_update_timestamp() int[source]
Return a timestamp by which an update to the delay model will take effect.
- async run_processing() None[source]
Do the hard work of the F-engine.
This function takes place entirely on the GPU. Coarse delay happens. Then a batch FFT operation is applied, and finally, fine-delay, phase correction, quantisation and corner-turn are performed.
- async run_transmit() None[source]
Get the processed data from the GPU to the Network.
This could be done either with or without PeerDirect. In the non-PeerDirect case,
OutQueueItemobjects are pulled from the _out_queue. We wait for the events that mark the end of the processing, then copy the data to host memory before turning it over to thesenderfor transmission on the network. The “empty” item is then returned torun_processing()via the _out_free_queue, and once the chunk has been transmitted it is returned to _send_free_queue.In the PeerDirect case, the item and the chunk are bound together and share memory. In this case _send_free_queue is unused. The item is only returned to _out_free_queue once it has been fully transmitted.
- set_gains(input: int, gains: ndarray) None[source]
Set the eq gains for one polarisation and update the sensor.
The gains must contain one entry per channel; the shortcut of supplying a single value is handled by
request_gain().
- shutdown() None[source]
Start graceful shutdown after the final call to
add_in_item().
- static update_delay_sensor(delay_models: Sequence[LinearDelayModel], *, delay_sensor: Sensor, adc_sample_rate: float) None[source]
Update the delay sensor upon loading of a new model.
Accepting the delay_models as a read-only Sequence from the MultiDelayModel, even though we only need the first one to update the sensor.
The delay and phase-rate values need to be scaled back to their original values (delay (s), phase-rate (rad/s)).
- katgpucbf.fgpu.engine.dig_rms_dbfs_status(value: float) Status[source]
Compute status for dig-rms-dbfs sensor.
- katgpucbf.fgpu.engine.format_complex(value: Complex) str[source]
Format a complex number for a katcp request.
The ICD specifies that complex numbers have the format real+imaginary j. Python’s default formatting contains parentheses if real is non-zero, and omits the real part if it is zero. For a numpy value it also only includes enough significant figures for the dtype, which means that reading it back as a Python complex may not give exactly the same value.
- katgpucbf.fgpu.engine.generate_ddc_weights(output: NarrowbandOutput, adc_sample_rate: float) ndarray[source]
Generate filter weights for the narrowband low-pass filter.
- katgpucbf.fgpu.engine.generate_pfb_weights(step: int, taps: int, w_cutoff: float, window_function: WindowFunction) ndarray[source]
Generate Hann-window weights for the F-engine’s PFB-FIR.
The resulting weights are normalised such that the sum of squares is 1.
- Parameters:
step – Number of samples per spectrum.
taps – Number of taps in the PFB-FIR.
w_cutoff – Scaling factor for the width of the channel response.
window_function – Window function to use.
- Returns:
Array containing the weights for the PFB-FIR filters, as single-precision floats.
- Return type: