################################################################################
# Copyright (c) 2020-2026, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
Define the objects that implement an entire GPU XB-Engine pipeline.
The XBEngine comprises multiple Pipeline objects that facilitate output data
products. Additionally, several subclasses of :class:`.QueueItem` are defined
for passing data between parts of the engine.
"""
import asyncio
import logging
import math
import time
from abc import abstractmethod
from collections.abc import Sequence
from random import SystemRandom
import aiokatcp
import katsdpsigproc
import katsdpsigproc.resource
import numpy as np
import spead2.recv
import vkgdr.pycuda
from katsdpsigproc import accel
from katsdpsigproc.abc import AbstractContext
from numpy.typing import NDArray
from .. import (
COMPLEX,
DESCRIPTOR_TASK_NAME,
ENGINE_DITHER_SEED_BITS,
GPU_PROC_TASK_NAME,
N_POLS,
RECV_TASK_NAME,
SEND_TASK_NAME,
SPEAD_DESCRIPTOR_INTERVAL_S,
)
from .. import recv as base_recv
from ..mapped_array import MappedArray
from ..monitor import Monitor
from ..queue_item import QueueItem
from ..recv import RECV_SENSOR_TIMEOUT_CHUNKS, RECV_SENSOR_TIMEOUT_MIN
from ..ringbuffer import ChunkRingbuffer
from ..send import DescriptorSender
from ..utils import Engine, TimeConverter, make_rate_limited_sensor
from . import DEFAULT_BPIPELINE_NAME, DEFAULT_N_IN_ITEMS, DEFAULT_N_OUT_ITEMS, DEFAULT_XPIPELINE_NAME, recv
from .beamform import Beam, BeamformTemplate
from .bsend import BSend
from .bsend import make_stream as make_bstream
from .correlation import CorrelationTemplate
from .output import BOutput, Output, XOutput
from .send import Send
from .xsend import XSend, incomplete_accum_counter, skipped_accum_counter
from .xsend import make_stream as make_xstream
logger = logging.getLogger(__name__)
[docs]
class InQueueItem(QueueItem):
"""
Extension of the QueueItem for use in receive queues.
The InQueueItem holds a reference to the received Chunk because its data is
copied into the GPU buffer asynchronously. Once the GPU processing loop is
done with the Chunk's data, it hands it back to the receiving loop to reuse
the resource. It is for this reason that the Chunk's heap presence is
stored separately.
The InQueueItem also holds a reference count of the number of pipelines still
using this item. This is to ensure the item isn't freed before each pipeline
has had a chance to use the received data.
"""
def __init__(self, buffer_device: accel.DeviceArray, present: np.ndarray, timestamp: int = 0) -> None:
self.buffer_device = buffer_device
self.present = present
self.refcount = 0
super().__init__(timestamp=timestamp)
[docs]
def reset(self, timestamp: int = 0) -> None:
"""Reset the timestamp and chunk."""
super().reset(timestamp)
self.chunk: recv.Chunk | None = None
[docs]
class XOutQueueItem(QueueItem):
"""
Extension of the QueueItem to track antennas that have missed data.
The XOutQueueItem between the gpu-proc and sender loops needs to carry a
record of which antennas have missed data at any point in the accumulation
being processed. This is used to determine whether any output products were
affected, and have their data zeroed accordingly.
"""
def __init__(
self,
buffer_device: accel.DeviceArray,
saturated: accel.DeviceArray,
present_ants: np.ndarray,
present_baselines: MappedArray,
timestamp: int = 0,
) -> None:
self.buffer_device = buffer_device
self.saturated = saturated
self.present_ants = present_ants
self.present_baselines = present_baselines
super().__init__(timestamp)
[docs]
def reset(self, timestamp: int = 0) -> None:
"""Reset the timestamp and present antenna tracker."""
super().reset(timestamp=timestamp)
self.present_ants.fill(True) # Assume they're fine until told otherwise
self.batches = 0
[docs]
def update_present_baselines(self) -> None:
"""Recompute present_baselines from present_ants."""
# See Correlation.get_baseline_index for the ordering.
# We do a column of the triangle at a time for efficiency.
n_ants = len(self.present_ants)
offset = 0
for i in range(n_ants):
section = self.present_baselines.host[offset : offset + i + 1]
if self.present_ants[i]:
section[:] = self.present_ants[: i + 1]
else:
section[:] = 0
offset += len(section)
[docs]
class BOutQueueItem(QueueItem):
"""Transmit queue item for the beamformer pipeline.
The `out`, `saturated`, `rand_states`, `weights` and `delays` must have the
same shape and dtype as the corresponding slots in :class:`.Beamform`. This
class needs to carry a record of which antennas have missed data in the
current :class:`~katgpucbf.recv.Chunk` of data. This is used to determine
whether any beam data has been affected, and have their data flagged
accordingly.
Parameters
----------
out
An int8 type :class:`~katsdpsigproc.accel.DeviceArray` with shape
(batches, ants, channels, spectra_per_batch, N_POLS, COMPLEX).
saturated
An uint32 type :class:`~katsdpsigproc.accel.DeviceArray` with shape
(n_beams,).
present
A boolean array with shape (heaps_per_fengine_per_chunk, n_ants).
weights
An np.complex64 :class:`~katgpucbf.mapped_array.MappedArray` with
shape (n_ants, n_beams).
delays
An np.float32 :class:`~katgpucbf.mapped_array.MappedArray` with shape
(n_ants, n_beams).
timestamp
ADC sample count of the received Chunk.
.. todo::
Potentially create (yet another) base class for B- and XOutQueueItems.
"""
def __init__(
self,
out: accel.DeviceArray,
saturated: accel.DeviceArray,
present: np.ndarray,
weights: MappedArray,
delays: MappedArray,
timestamp: int = 0,
) -> None:
self.out = out
self.saturated = saturated
self.present = present
self.weights = weights
self.delays = delays
#: Version of weights and delays (for comparison to BPipeline._weights_version)
self.weights_version = -1
super().__init__(timestamp)
[docs]
def reset(self, timestamp: int = 0) -> None:
"""Reset the timestamp and the present antenna tracker."""
super().reset(timestamp=timestamp)
self.present.fill(True)
[docs]
class Pipeline[O: Output, T: QueueItem]:
r"""Base Pipeline class to build on.
SPEAD heaps utilised by this pipeline are first received at the
:class:`.XBEngine`-level and uploaded to the GPU for processing.
The pipeline then performs GPU-accelerated processing on the uploaded
data before packetising and sending the results back out onto the network.
Data processing occurs across three separate async methods. Data is passed
between these methods using :class:`asyncio.Queue`\s. See each method for
more details.
- :meth:`.XBEngine._receiver_loop`,
- :meth:`.gpu_proc_loop`,
- :meth:`.sender_loop`.
Items passed between queues may still have GPU operations in progress.
:class:`.QueueItem` provides mechanisms to wait for the in-progress work to
complete.
Parameters
----------
outputs
Sequence of Output config for data product (BOutput or XOutput).
name
Name of Pipeline.
engine
The owning engine.
context
CUDA context for device work.
"""
# NOTE: n_in_item and n_out_items dictate the number of GPU buffers.
# Setting these values too high results in too much GPU memory being
# consumed. The quantity of each needs to be sufficient so as not to starve
# the different processing loops. The low single digits is suitable.
# These values are not configurable as they have been acceptable for
# most tests cases up until now. If the pipeline starts bottlenecking,
# then maybe look at increasing these values.
n_in_items = DEFAULT_N_IN_ITEMS
n_out_items = DEFAULT_N_OUT_ITEMS
send_stream: Send
def __init__(self, outputs: Sequence[O], name: str, engine: "XBEngine", context: AbstractContext) -> None:
self.outputs = outputs
self.name = name
self.engine = engine
self._proc_command_queue = context.create_command_queue()
self._download_command_queue = context.create_command_queue()
# These queues are extended in the monitor class, allowing for the
# monitor to track the number of items on each queue.
# - The _in_queue receives items from :meth:`.XBEngine._receiver_loop`
# to be used by :meth:`gpu_proc_loop`.
# - The _out_queue receives items from :meth:`gpu_proc_loop` to be
# used by :meth:`sender_loop`.
# Once the destination function is finished with an item, it will pass
# it back to the corresponding free-item queue to ensure that all
# allocated buffers are in continuous circulation.
# NOTE: Pipelines must not place :class:`InQueueItem`\ s directly back on
# the `_in_free_queue` as multiple pipelines will hold
# references to a single :class:`InQueueItem`. Instead, invoke
# :meth:`.XBEngine.free_in_item` to indicate this Pipeline no longer
# holds a reference to the item.
self._in_queue: asyncio.Queue[InQueueItem | None] = engine.monitor.make_queue(
f"{name}.in_queue", self.n_in_items
)
self._out_queue: asyncio.Queue[T | None] = engine.monitor.make_queue(f"{name}.out_queue", self.n_out_items)
self._out_free_queue: asyncio.Queue[T] = engine.monitor.make_queue(f"{name}.out_free_queue", self.n_out_items)
[docs]
def add_in_item(self, item: InQueueItem) -> None:
"""Append a newly-received :class:`InQueueItem` to the :attr:`_in_queue`."""
self._in_queue.put_nowait(item)
[docs]
def shutdown(self) -> None:
"""Start a graceful shutdown after the final call to :meth:`add_in_item`."""
self._in_queue.put_nowait(None)
[docs]
@abstractmethod
async def gpu_proc_loop(self) -> None:
"""Perform all GPU processing of received data in a continuous loop.
This method does the following:
- Get an InQueueItem off the in_queue
- Ensure it is not a NoneType value (indicating shutdown sequence)
- await any outstanding events associated with the InQueueItem
- Apply GPU processing to data in the InQueueItem
- Bind input buffer(s) accordingly
- Obtain a free QueueItem from the out_free_queue
- Add event marker to wait for the proc_command_queue
- Put the prepared QueueItem on the out_queue
NOTE: An initial QueueItem needs to be obtained from the out_free_queue
for the first round of processing:
- The gpu_proc_loop requires logic to decipher the timestamp of the
first heap output.
- It also provides an opportunity to bind buffers before processing is queued.
"""
raise NotImplementedError # pragma: nocover
[docs]
@abstractmethod
async def sender_loop(self) -> None:
"""Send heaps to the network in a continuous loop.
This method does the following:
- Get a QueueItem from the out_queue
- Ensure it is not a NoneType value (indicating shutdown sequence)
- Wait for events on the item to complete (likely GPU processing)
- Wait for an available heap buffer from the send_stream
- Asynchronously Transfer/Download GPU buffer data into the heap buffer in system RAM
- Wait for the transfer to complete, before
- Transmit heap buffer onto the network
- Place the QueueItem back on the out_free_queue once complete
"""
raise NotImplementedError # pragma: nocover
[docs]
@abstractmethod
def capture_enable(self, *, stream_id: int, enable: bool = True, timestamp: int = 0) -> None:
"""Enable/Disable the transmission of a data product's stream."""
raise NotImplementedError # pragma: nocover
[docs]
class BPipeline(Pipeline[BOutput, BOutQueueItem]):
"""Processing pipeline for a collection of :class:`.output.BOutput`."""
def __init__(
self,
outputs: Sequence[BOutput],
engine: "XBEngine",
context: AbstractContext,
vkgdr_handle: vkgdr.Vkgdr,
init_send_enabled: bool,
name: str = DEFAULT_BPIPELINE_NAME,
) -> None:
super().__init__(outputs, name, engine, context)
template = BeamformTemplate(
context,
[Beam(pol=output.pol, dither=output.dither) for output in outputs],
n_spectra_per_batch=engine.recv_layout.n_spectra_per_heap,
)
seed = SystemRandom().randrange(2**ENGINE_DITHER_SEED_BITS)
self._beamform = template.instantiate(
self._proc_command_queue,
n_batches=engine.heaps_per_fengine_per_chunk,
n_ants=engine.n_ants,
n_channels_per_substream=engine.n_channels_per_substream,
seed=seed,
sequence_first=engine.channel_offset_value,
sequence_step=engine.n_channels,
)
allocator = accel.DeviceAllocator(context=context)
for _ in range(self.n_out_items):
out = self._beamform.slots["out"].allocate(allocator=allocator, bind=False)
saturated = self._beamform.slots["saturated"].allocate(allocator=allocator, bind=False)
present = np.zeros(shape=(engine.heaps_per_fengine_per_chunk, engine.n_ants), dtype=bool)
weights = MappedArray.from_slot(vkgdr_handle, context, self._beamform.slots["weights"])
delays = MappedArray.from_slot(vkgdr_handle, context, self._beamform.slots["delays"])
out_item = BOutQueueItem(out, saturated, present, weights, delays)
self._out_free_queue.put_nowait(out_item)
# These are the original weights, delays and gains as provided by the
# user, rather than the processed values passed to the kernel.
self._weights = np.ones((len(outputs), engine.n_ants), np.float64)
self._delays = np.zeros((len(outputs), engine.n_ants, 2), np.float64)
self._quant_gains = np.ones(len(outputs), np.float64)
self._weights_version = 1
# Timestamp which will include effect of a weights update made now
self._weights_steady = 0
# Note: there is no particular reason that n_chunks should match
# n_out_items, other than it being a reasonable value of the depth
# of any queue.
self.send_stream: BSend = BSend(
outputs=outputs,
batches_per_chunk=engine.heaps_per_fengine_per_chunk,
n_chunks=self.n_out_items,
n_channels=engine.n_channels,
n_channels_per_substream=engine.n_channels_per_substream,
spectra_per_heap=engine.recv_layout.n_spectra_per_heap,
adc_sample_rate=engine.adc_sample_rate,
timestamp_step=engine.recv_heap_timestamp_step,
send_rate_factor=engine.send_rate_factor,
channel_offset=engine.channel_offset_value,
context=context,
packet_payload=engine.send_packet_payload,
stream_factory=lambda stream_config, buffers: make_bstream(
output_names=[output.name for output in outputs],
endpoints=[output.dst for output in outputs],
interface=engine.send_interface,
ttl=engine.send_ttl,
use_ibv=engine.send_ibv,
affinity=engine.send_affinity,
comp_vector=engine.send_comp_vector,
buffer_size=engine.send_buffer,
stream_config=stream_config,
buffers=buffers,
),
send_enabled=init_send_enabled,
)
self._populate_sensors(seed)
def _populate_sensors(self, seed: int) -> None:
sensors = self.engine.sensors
for i, output in enumerate(self.outputs):
# Static sensors
sensors.add(
aiokatcp.Sensor(
str,
f"{output.name}.chan-range",
"The range of channels processed by this B-engine, inclusive",
default=f"({self.engine.channel_offset_value},"
f"{self.engine.channel_offset_value + self.engine.n_channels_per_substream - 1})",
initial_status=aiokatcp.Sensor.Status.NOMINAL,
)
)
sensors.add(
aiokatcp.Sensor(
str,
f"{output.name}.dither-seed",
"Random seed used in dithering for quantisation",
default=str(seed),
initial_status=aiokatcp.Sensor.Status.NOMINAL,
)
)
# Dynamic sensors
default_delays_str = ", ".join(str(value) for value in self._delays[i].flatten())
sensors.add(
aiokatcp.Sensor(
str,
f"{output.name}.delay",
"The delay settings of the inputs for this beam. Each input has "
"a delay [s] and phase [rad]: (loadmcnt, delay0, phase0, delay1, "
"phase1, ...)",
default=f"(0, {default_delays_str})",
initial_status=aiokatcp.Sensor.Status.NOMINAL,
)
)
sensors.add(
aiokatcp.Sensor(
float,
f"{output.name}.quantiser-gain",
"Non-complex post-summation quantiser gain applied to this beam",
default=self._quant_gains[i],
initial_status=aiokatcp.Sensor.Status.NOMINAL,
)
)
sensors.add(
aiokatcp.Sensor(
str,
f"{output.name}.weight",
"The summing weights applied to all the inputs of this beam",
# Cast to list first to add comma delimiter
default=str(self._weights[i].tolist()),
initial_status=aiokatcp.Sensor.Status.NOMINAL,
)
)
sensors.add(
make_rate_limited_sensor(
int,
f"{output.name}.beng-clip-cnt",
"Number of complex samples that saturated.",
default=0,
initial_status=aiokatcp.Sensor.Status.NOMINAL,
)
)
sensors.add(
make_rate_limited_sensor(
int,
f"{output.name}.tx.next-timestamp",
"Timestamp (in samples) that has not yet been sent. This "
"is strictly greater than any timestamp of the previous "
"capture and less than or equal to any timestamp of the "
"following capture.",
default=0,
initial_status=aiokatcp.Sensor.Status.NOMINAL,
)
)
async def _get_in_item(self) -> InQueueItem | None:
"""Get the next :class:`InQueueItem`.
This is wrapped in a method so that it can be mocked.
"""
return await self._in_queue.get()
[docs]
async def gpu_proc_loop(self) -> None: # noqa: D102
while True:
# Get item from the receiver loop.
# - Wait for the HtoD transfers to complete, then
# - Give the chunk back to the receiver for reuse.
in_item = await self._get_in_item()
if in_item is None:
break
await in_item.async_wait_for_events()
out_item = await self._out_free_queue.get()
await out_item.async_wait_for_events()
out_item.reset(in_item.timestamp)
# After this point it's too late for set_weights etc to update
# the weights for this timestamp.
self._weights_steady = (
in_item.timestamp + self.engine.heaps_per_fengine_per_chunk * self.engine.recv_heap_timestamp_step
)
# Recompute the weights and delays if necessary
if out_item.weights_version != self._weights_version:
channel_spacing = self.engine.bandwidth / self.engine.n_channels
# The user provides a fringe phase for the centre frequency. We
# need to adjust that to the target fringe phase for the first
# channel processed by this engine (channel_offset_value).
centre_channel = self.engine.n_channels / 2
fringe_scale = -2 * np.pi * channel_spacing * (self.engine.channel_offset_value - centre_channel)
fringe_phase = self._delays.T[1] + fringe_scale * self._delays.T[0]
fringe_rotator = np.exp(1j * fringe_phase)
out_item.weights.host[:] = self._weights.T * self._quant_gains * fringe_rotator
# The factor of 2 combines with the factor of pi used by the
# kernel to give a factor of 2pi, to convert cycles to radians.
# The minus sign is because delaying the wave results in a
# decrease in the phase at a fixed time.
out_item.delays.host[:] = -2 * channel_spacing * self._delays.T[0]
out_item.weights_version = self._weights_version
# Queue GPU work
out_item.saturated.zero(self._proc_command_queue)
self._beamform.bind(
**{
"in": in_item.buffer_device,
"out": out_item.out,
"saturated": out_item.saturated,
"weights": out_item.weights.device,
"delays": out_item.delays.device,
}
)
self._beamform()
out_item.present[:] = in_item.present
out_item.add_marker(self._proc_command_queue)
self._out_queue.put_nowait(out_item)
# Finish with the in_item
in_item.add_marker(self._proc_command_queue)
self.engine.free_in_item(in_item)
# When the stream is closed, if the sender loop is waiting for an out item,
# it will never exit. Upon receiving this NoneType, the sender_loop can
# stop waiting and exit.
logger.debug("gpu_proc_loop completed")
self._out_queue.put_nowait(None)
[docs]
async def sender_loop(self) -> None: # noqa: D102
# NOTE: This function passes the entire downloaded data to
# chunk.send, which then takes care of directing data to each beam's
# output destination.
while True:
item = await self._out_queue.get()
if item is None:
break
# The CPU doesn't need to wait, but the GPU does to ensure it
# won't start the download before computation is done.
item.enqueue_wait_for_events(self._download_command_queue)
chunk = await self.send_stream.get_free_chunk()
item.out.get_async(self._download_command_queue, chunk.data)
item.saturated.get_async(self._download_command_queue, chunk.saturated)
event = self._download_command_queue.enqueue_marker()
await katsdpsigproc.resource.async_wait_for_events([event])
np.sum(item.present, axis=1, dtype=np.uint64, out=chunk.present_ants)
chunk.timestamp = item.timestamp
self.send_stream.send_chunk(chunk, self.engine.time_converter, self.engine.sensors)
self._out_free_queue.put_nowait(item)
await self.send_stream.send_stop_heap()
logger.debug("sender_loop completed")
[docs]
def capture_enable(self, *, stream_id: int, enable: bool = True, timestamp: int = 0) -> None: # noqa: D102
self.send_stream.enable_beam(beam_id=stream_id, enable=enable, timestamp=timestamp)
def _weights_updated(self) -> int:
"""Update version tracking when weight-related parameters are updated.
Returns
-------
int
:attr:`_weights_steady`, the timestamp that includes the effect of
a beam request update made now.
"""
self._weights_version += 1
self.engine.update_steady_state_timestamp(self._weights_steady)
return self._weights_steady
[docs]
def set_weights(self, stream_id: int, weights: np.ndarray) -> int:
"""Set the beam weights for one beam.
Parameters
----------
stream_id
The index of the beam whose weights are being set.
weights
A 1D array containing real-valued weights (per input).
Returns
-------
int
:attr:`_weights_steady`, the timestamp that includes the effect of
a beam-weights update made now.
"""
self._weights[stream_id] = weights
return self._weights_updated()
[docs]
def set_quant_gain(self, stream_id: int, gain: float) -> int:
"""Set the quantisation gain for one beam.
Parameters
----------
stream_id
The index of the beam whose quantisation gain is being set.
gain
Real-valued quantisation gain.
Returns
-------
int
:attr:`_weights_steady`, the timestamp that includes the effect of
a beam-quant-gain update made now.
"""
self._quant_gains[stream_id] = gain
return self._weights_updated()
[docs]
def set_delays(self, stream_id: int, delays: np.ndarray) -> int:
"""Set the beam steering delays for one beam.
Parameters
----------
stream_id
The index of the beam whose quantisation gain is being set.
delays
A 2D array of delay coefficients. The first axis corresponds to
inputs. The second axis has length two, with the first element
containing the delay in seconds, and the second containing a
channel-independent phase rotation in radians.
Returns
-------
int
:attr:`_weights_steady`, the timestamp that includes the effect of
a beam-delay update made now.
"""
self._delays[stream_id] = delays
return self._weights_updated()
[docs]
class XPipeline(Pipeline[XOutput, XOutQueueItem]):
"""Processing pipeline for a single baseline-correlation-products stream."""
def __init__(
self,
output: XOutput,
engine: "XBEngine",
context: AbstractContext,
vkgdr_handle: vkgdr.Vkgdr,
init_send_enabled: bool,
name: str = DEFAULT_XPIPELINE_NAME,
) -> None:
super().__init__([output], name, engine, context)
self.timestamp_increment_per_accumulation = output.heap_accumulation_threshold * engine.recv_heap_timestamp_step
# NOTE: This value staggers the send so that packets within a heap are
# transmitted onto the network across the entire time between dumps.
# Care needs to be taken to ensure that this rate is not set too high.
# If it is too high, the entire pipeline will stall needlessly waiting
# for packets to be transmitted too slowly.
self.dump_interval_s = self.timestamp_increment_per_accumulation / engine.adc_sample_rate
correlation_template = CorrelationTemplate(
context=context,
n_ants=engine.n_ants,
n_channels_per_substream=engine.n_channels_per_substream,
n_spectra_per_heap=engine.recv_layout.n_spectra_per_heap,
input_sample_bits=engine.sample_bits,
)
self.correlation = correlation_template.instantiate(
self._proc_command_queue, n_batches=engine.recv_layout.heaps_per_fengine_per_chunk
)
allocator = accel.DeviceAllocator(context=context)
for _ in range(self.n_out_items):
buffer_device = self.correlation.slots["out_visibilities"].allocate(allocator, bind=False)
saturated = self.correlation.slots["out_saturated"].allocate(allocator, bind=False)
present_ants = np.zeros(shape=(engine.n_ants,), dtype=bool)
present_baselines = MappedArray.from_slot(
vkgdr_handle, context, self.correlation.slots["present_baselines"]
)
out_item = XOutQueueItem(buffer_device, saturated, present_ants, present_baselines)
self._out_free_queue.put_nowait(out_item)
self.send_stream: XSend = XSend(
output_name=output.name,
n_ants=engine.n_ants,
n_channels=engine.n_channels,
n_channels_per_substream=engine.n_channels_per_substream,
dump_interval_s=self.dump_interval_s,
send_rate_factor=engine.send_rate_factor,
channel_offset=engine.channel_offset_value,
context=context,
packet_payload=engine.send_packet_payload,
stream_factory=lambda stream_config, buffers: make_xstream(
output_name=output.name,
dest_ip=output.dst.host,
dest_port=output.dst.port,
interface_ip=engine.send_interface,
ttl=engine.send_ttl,
use_ibv=engine.send_ibv,
affinity=engine.send_affinity,
comp_vector=engine.send_comp_vector,
buffer_size=engine.send_buffer,
stream_config=stream_config,
buffers=buffers,
),
send_enabled=init_send_enabled,
)
self._populate_sensors()
@property
def output(self) -> XOutput:
"""The single :class:`Output` produced by this pipeline."""
return self.outputs[0]
def _populate_sensors(self) -> None:
sensors = self.engine.sensors
# Static sensors
sensors.add(
aiokatcp.Sensor(
str,
f"{self.output.name}.chan-range",
"The range of channels processed by this X-engine, inclusive",
default=f"({self.engine.channel_offset_value},"
f"{self.engine.channel_offset_value + self.engine.n_channels_per_substream - 1})",
initial_status=aiokatcp.Sensor.Status.NOMINAL,
)
)
# Dynamic sensors
sensors.add(
aiokatcp.Sensor(
bool,
f"{self.output.name}.rx.synchronised",
"For the latest accumulation, was data present from all F-Engines.",
default=False,
initial_status=aiokatcp.Sensor.Status.ERROR,
status_func=lambda value: aiokatcp.Sensor.Status.NOMINAL if value else aiokatcp.Sensor.Status.ERROR,
)
)
sensors.add(
aiokatcp.Sensor(
int,
f"{self.output.name}.xeng-clip-cnt",
"Number of visibilities that saturated",
default=0,
initial_status=aiokatcp.Sensor.Status.NOMINAL,
)
)
sensors.add(
aiokatcp.Sensor(
int,
f"{self.output.name}.tx.next-timestamp",
"Timestamp (in samples) that has not yet been sent. This "
"is strictly greater than any timestamp of the previous "
"capture and less than or equal to any timestamp of the "
"following capture.",
default=0,
initial_status=aiokatcp.Sensor.Status.NOMINAL,
)
)
async def _flush_accumulation(self, out_item: XOutQueueItem, next_accum: int) -> XOutQueueItem:
"""Emit the current `out_item` and prepare a new one."""
next_timestamp = next_accum * self.timestamp_increment_per_accumulation
if out_item.batches == 0:
# We never actually started this accumulation. We can just
# update the timestamp and continue using it.
out_item.timestamp = next_timestamp
return out_item
# present_ants only takes into account batches that have
# been seen. If some batches went missing entirely, the
# whole accumulation is bad.
if out_item.batches != self.output.heap_accumulation_threshold:
out_item.present_ants.fill(False)
# Update the sync sensor (converting np.bool_ to Python bool)
# Note: the sensor timestamp is made the end of the current
# accumulation, which is usually the same as next_timestamp
# but might be different if entire accumulations were skipped.
self.engine.sensors[f"{self.output.name}.rx.synchronised"].set_value(
value=bool(out_item.present_ants.all()),
timestamp=self.engine.time_converter.adc_to_unix(
out_item.timestamp + self.timestamp_increment_per_accumulation
),
)
out_item.update_present_baselines()
self.correlation.reduce()
out_item.add_marker(self._proc_command_queue)
self._out_queue.put_nowait(out_item)
# Prepare for the next accumulation (which might not be
# contiguous with the previous one).
out_item = await self._out_free_queue.get()
await out_item.async_wait_for_events()
out_item.reset(next_timestamp)
self.correlation.bind(
out_visibilities=out_item.buffer_device,
out_saturated=out_item.saturated,
present_baselines=out_item.present_baselines.device,
)
self.correlation.zero_visibilities()
return out_item
[docs]
async def gpu_proc_loop(self) -> None: # noqa: D102
# NOTE: The ratio of in_items to out_items is not one-to-one; there are expected
# to be many more in_items in for every out_item out. For this reason, and in
# addition to the steps outlined in :meth:`.Pipeline.gpu_proc_loop`, data is
# only transferred to a `XOutQueueItem` once sufficient correlations have occurred.
in_item: InQueueItem | None
def do_correlation() -> None:
"""Apply correlation kernel to all pending batches."""
first_batch = self.correlation.first_batch
last_batch = self.correlation.last_batch
assert in_item is not None
present = in_item.present[first_batch:last_batch, :]
if first_batch < last_batch and present.any():
self.correlation()
# Update the present ants tracker one last time
out_item.present_ants[:] &= present.all(axis=0)
out_item.batches += last_batch - first_batch
self.correlation.first_batch = last_batch
out_item = await self._out_free_queue.get()
await out_item.async_wait_for_events()
# Indicate that the timestamp still needs to be filled in.
out_item.timestamp = -1
self.correlation.bind(
out_visibilities=out_item.buffer_device,
out_saturated=out_item.saturated,
present_baselines=out_item.present_baselines.device,
)
self.correlation.zero_visibilities()
while True:
# Get item from the receiver function.
# - Wait for the HtoD transfers to complete, then
# - Give the chunk back to the receiver for reuse.
in_item = await self._in_queue.get()
if in_item is None:
break
await in_item.async_wait_for_events()
current_timestamp = in_item.timestamp
if out_item.timestamp < 0:
# First heap seen. Round the timestamp down to the previous
# accumulation boundary
out_item.timestamp = (
current_timestamp
// self.timestamp_increment_per_accumulation
* self.timestamp_increment_per_accumulation
)
self.correlation.bind(in_samples=in_item.buffer_device)
# Initially no work to do; as each batch is examined, last_batch
# is extended.
self.correlation.first_batch = 0
self.correlation.last_batch = 0
for i in range(self.engine.heaps_per_fengine_per_chunk):
# NOTE: The timestamp representing the end of an
# accumulation does not necessarily line up with the chunk
# timestamp. It will line up with a specific batch within a
# chunk though, this is why this check has to happen for each
# batch. This check is the equivalent of the MeerKAT SKARAB
# X-Engine auto-resync logic.
current_accum = current_timestamp // self.timestamp_increment_per_accumulation
send_accum = out_item.timestamp // self.timestamp_increment_per_accumulation
if current_accum != send_accum:
do_correlation()
out_item = await self._flush_accumulation(out_item, current_accum)
self.correlation.last_batch = i + 1
current_timestamp += self.engine.recv_heap_timestamp_step
do_correlation()
# If the last batch of the chunk was also the last batch of the
# accumulation, we can flush it now without waiting for more data.
# This is mostly a convenience for unit tests, since in practice
# we'd expect to see more data soon.
current_accum = current_timestamp // self.timestamp_increment_per_accumulation
send_accum = out_item.timestamp // self.timestamp_increment_per_accumulation
if current_accum != send_accum:
out_item = await self._flush_accumulation(out_item, current_accum)
in_item.add_marker(self._proc_command_queue)
self.engine.free_in_item(in_item)
# When the stream is closed, if the sender loop is waiting for an out item,
# it will never exit. Upon receiving this NoneType, the sender_loop can
# stop waiting and exit.
logger.debug("gpu_proc_loop completed")
self._out_queue.put_nowait(None)
[docs]
async def sender_loop(self) -> None: # noqa: D102
# NOTE: The transfer from the GPU to the heap buffer and the sending onto
# the network could be pipelined a bit better, but this is not really
# required in this loop as this whole process occurs at a much slower
# pace than the rest of the pipeline.
old_time_s = time.time()
old_timestamp = 0
while True:
item = await self._out_queue.get()
if item is None:
break
await item.async_wait_for_events()
if not np.any(item.present_ants):
# All Antennas have missed data at some point, avoid sending altogether
logger.warning("All Antennas had a break in data during this accumulation")
skipped_accum_counter.labels(self.output.name).inc(1)
else:
heap = await self.send_stream.get_free_heap()
# NOTE: We do not expect the time between dumps to be the same each
# time as the time.time() function checks the wall time now, not
# the actual time between timestamps. The difference between dump
# timestamps is expected to be constant.
new_time_s = time.time()
time_difference_between_heaps_s = new_time_s - old_time_s
logger.debug(
"Current output heap timestamp: %#x, difference between timestamps: %#x, "
"wall time between dumps %.2f s",
item.timestamp,
item.timestamp - old_timestamp,
time_difference_between_heaps_s,
)
old_time_s = new_time_s
old_timestamp = item.timestamp
item.buffer_device.get_async(self._download_command_queue, heap.buffer)
item.saturated.get_async(self._download_command_queue, heap.saturated)
event = self._download_command_queue.enqueue_marker()
await katsdpsigproc.resource.async_wait_for_events([event])
if not np.all(item.present_ants):
incomplete_accum_counter.labels(self.output.name).inc(1)
heap.timestamp = item.timestamp
end_adc_timestamp = item.timestamp + self.timestamp_increment_per_accumulation
self.engine.sensors[f"{self.output.name}.tx.next-timestamp"].set_value(
end_adc_timestamp,
timestamp=self.engine.time_converter.adc_to_unix(item.timestamp),
)
if self.send_stream.send_enabled:
# Convert timestamp for the *end* of the heap (not the start)
# to a UNIX time for the sensor update. NB: this should be done
# *before* send_heap, because that gives away ownership of the
# heap.
end_timestamp = self.engine.time_converter.adc_to_unix(end_adc_timestamp)
clip_cnt_sensor = self.engine.sensors[f"{self.output.name}.xeng-clip-cnt"]
clip_cnt_sensor.set_value(clip_cnt_sensor.value + int(heap.saturated), timestamp=end_timestamp)
self.send_stream.send_heap(heap)
await self._out_free_queue.put(item)
await self.send_stream.send_stop_heap()
logger.debug("sender_loop completed")
[docs]
def capture_enable(self, *, stream_id: int, enable: bool = True, timestamp: int = 0) -> None: # noqa: D102
self.send_stream.send_enabled = enable
self.send_stream.send_enabled_timestamp = timestamp
[docs]
class XBEngine(Engine):
r"""GPU XB-Engine pipeline.
Currently the B-Engine functionality has not been added. This class currently
only creates an X-Engine pipeline.
The XB-Engine conducts the reception of SPEAD heaps from F-engines and makes
the data available to the constituent pipelines. In order to reduce the load
on the main thread, received data is collected into chunks. A chunk consists
of multiple batches of F-Engine heaps where a batch is a collection of heaps
from all F-Engine with the same timestamp.
There is a seperate function for sending descriptors onto the network.
Class initialisers allocate all memory buffers to be used during the lifetime
of the XBEngine object. These buffers are continuously reused to ensure
memory use remains constrained.
.. todo::
A lot of the sensors are common to both the F- and X-engines. It may be
worth investigating some kind of abstract base class for engines to build
on top of.
Parameters
----------
katcp_host
Hostname or IP on which to listen for KATCP C&M connections.
katcp_port
Network port on which to listen for KATCP C&M connections.
adc_sample_rate
Sample rate of the digitisers in the current array (in Hz). This value
is required to calculate the packet spacing of the output heaps. If it
is set incorrectly, the packet spacing could be too large causing the
pipeline to stall as heaps queue at the sender faster than they are
sent.
bandwidth
Total bandwidth across `n_channels` channels (in Hz). This is used in
delay calculations.
send_rate_factor
Configure the spead2 sender with a rate proportional to this factor.
This value is intended to dictate a data transmission rate slightly
higher/faster than the ADC rate. A factor of zero (0) tells the sender
to transmit as fast as possible.
n_ants
The number of antennas to be correlated.
n_channels
The total number of frequency channels out of the F-Engine.
n_channels_per_substream
The number of frequency channels contained in the incoming F-engine
data stream.
n_samples_between_spectra
The number of samples between frequency spectra received.
n_spectra_per_heap
The number of time samples received per frequency channel.
sample_bits
The number of bits per sample. Only 8 bits is supported at the moment.
sync_time
UNIX time corresponding to timestamp zero
channel_offset_value
The index of the first channel in the subset of channels processed by
this XB-Engine. Used to set the value in the XB-Engine output heaps for
spectrum reassembly by the downstream receiver.
outputs
Output streams to generate.
src
Endpoint for the incoming data.
recv_interface
IP address of the network device to use for input.
recv_ibv
Use ibverbs for input.
recv_affinity
Specific CPU core to assign the receive stream processing thread to.
recv_comp_vector
Completion vector for source stream, or -1 for polling.
See :class:`spead2.recv.UdpIbvConfig` for further information.
recv_buffer
The size of the network receive buffer.
heaps_per_fengine_per_chunk
The number of consecutive batches to store in the same chunk. The higher
this value is, the more GPU and system RAM is allocated, the lower,
the more work the Python processing thread is required to do.
recv_reorder_tol
Maximum tolerance for jitter between received packets, as a time
expressed in ADC sample ticks.
send_interface
IP address of the network device to use for output.
send_ttl
TTL for outgoing packets.
send_ibv
Use ibverbs for output.
send_packet_payload
Size for output packets (payload only; headers and padding are
added to this).
send_affinity
CPU core for output-handling thread.
send_comp_vector
Completion vector for transmission, or -1 for polling.
See :class:`spead2.send.UdpIbvConfig` for further information.
send_enabled
Start with correlator output transmission enabled, without having to
issue a katcp command.
monitor
:class:`Monitor` to use for generating multiple :class:`~asyncio.Queue`
objects needed to communicate between functions, and handling basic
reporting for :class:`~asyncio.Queue` sizes and events.
context
Device context for katsdpsigproc. It must be a CUDA device.
vkgdr_handle
Handle to vkgdr for the same device as `context`.
"""
VERSION = "katgpucbf-xbgpu-1.0"
def __init__(
self,
*,
katcp_host: str,
katcp_port: int,
adc_sample_rate: float,
bandwidth: float,
send_rate_factor: float,
n_ants: int,
n_channels: int,
n_channels_per_substream: int,
n_samples_between_spectra: int,
n_spectra_per_heap: int,
sample_bits: int,
sync_time: float,
channel_offset_value: int,
outputs: list[Output],
src: list[tuple[str, int]], # It's a list but it should be length 1 in xbgpu case.
recv_interface: str,
recv_ibv: bool,
recv_affinity: int,
recv_comp_vector: int,
recv_buffer: int,
send_interface: str,
send_buffer: int,
send_ttl: int,
send_ibv: bool,
send_packet_payload: int,
send_affinity: int,
send_comp_vector: int,
heaps_per_fengine_per_chunk: int, # Used for GPU memory tuning
recv_reorder_tol: int,
send_enabled: bool,
monitor: Monitor,
context: AbstractContext,
vkgdr_handle: vkgdr.Vkgdr,
):
super().__init__(katcp_host, katcp_port)
self._cancel_tasks: list[asyncio.Task] = [] # Tasks that need to be cancelled on shutdown
for output in outputs:
if channel_offset_value % n_channels_per_substream != 0:
raise ValueError(f"{output.name}: channel_offset must be an integer multiple of channels_per_substream")
# Array configuration parameters
self.adc_sample_rate = adc_sample_rate
self.bandwidth = bandwidth
self.time_converter = TimeConverter(sync_time, adc_sample_rate)
self.n_ants = n_ants
self.n_channels = n_channels
self.n_channels_per_substream = n_channels_per_substream
self.sample_bits = sample_bits
self.channel_offset_value = channel_offset_value
self._src = src
self._recv_interface = recv_interface
self._recv_ibv = recv_ibv
self._recv_buffer = recv_buffer
self._recv_comp_vector = recv_comp_vector
self.send_interface = send_interface
self.send_buffer = send_buffer
self.send_ttl = send_ttl
self.send_ibv = send_ibv
self.send_packet_payload = send_packet_payload
self.send_affinity = send_affinity
self.send_comp_vector = send_comp_vector
self.send_rate_factor = send_rate_factor
self.monitor = monitor
self.n_samples_between_spectra = n_samples_between_spectra
self.recv_heap_timestamp_step = n_samples_between_spectra * n_spectra_per_heap
self.populate_sensors(
self.sensors,
max(
RECV_SENSOR_TIMEOUT_MIN,
RECV_SENSOR_TIMEOUT_CHUNKS
* heaps_per_fengine_per_chunk
* self.recv_heap_timestamp_step
/ adc_sample_rate,
),
)
# Sets the number of batches of heaps to store per chunk
self.heaps_per_fengine_per_chunk = heaps_per_fengine_per_chunk
# NOTE: n_free_chunks dictates the number of buffers in system RAM.
# This can be set quite high as there is much more system RAM than GPU
# RAM. It should be higher than max_active_chunks.
# These values are not configurable as they have been acceptable for
# most tests cases up until now. If the pipeline starts bottlenecking,
# then maybe look at increasing these values.
self.max_active_chunks: int = (
math.ceil(recv_reorder_tol / self.recv_heap_timestamp_step / self.heaps_per_fengine_per_chunk) + 1
)
n_free_chunks: int = self.max_active_chunks + 8 # TODO: Abstract this 'naked' constant
data_ringbuffer = ChunkRingbuffer(
self.max_active_chunks, name="recv_data_ringbuffer", task_name=RECV_TASK_NAME, monitor=monitor
)
free_ringbuffer = spead2.recv.ChunkRingbuffer(n_free_chunks)
self.recv_layout = recv.Layout(
n_ants=n_ants,
n_channels_per_substream=n_channels_per_substream,
n_spectra_per_heap=n_spectra_per_heap,
sample_bits=self.sample_bits,
heap_timestamp_step=self.recv_heap_timestamp_step,
heaps_per_fengine_per_chunk=self.heaps_per_fengine_per_chunk,
)
self.receiver_stream = recv.make_stream(
layout=self.recv_layout,
data_ringbuffer=data_ringbuffer,
free_ringbuffer=free_ringbuffer,
recv_affinity=recv_affinity,
max_active_chunks=self.max_active_chunks,
)
# Prevent multiple chunks from being in flight in pipelines at the same
# time. This keeps the pipelines synchronised to avoid running out of
# RxQueueItems.
self._active_in_sem = asyncio.BoundedSemaphore(1)
self._pipelines: list[Pipeline] = []
x_outputs = [output for output in outputs if isinstance(output, XOutput)]
b_outputs = [output for output in outputs if isinstance(output, BOutput)]
self._pipelines = [XPipeline(x_output, self, context, vkgdr_handle, send_enabled) for x_output in x_outputs]
if b_outputs:
self._pipelines.append(BPipeline(b_outputs, self, context, vkgdr_handle, send_enabled))
self._upload_command_queue = context.create_command_queue()
# This queue is extended in the monitor class, allowing for the
# monitor to track the number of items on the queue.
# - The XBEngine passes items from the :meth:`_receiver_loop` to each
# pipeline via :meth:`.Pipeline.add_in_item`.
# - Once the each pipeline is finished with an :class:`InQueueItem`,
# it must pass it back to the _in_free_queue via
# :meth:`free_in_item` to ensure that all allocated buffers are in
# continuous circulation.
# NOTE: Too high means too much GPU memory gets allocate
self._in_free_queue: asyncio.Queue[InQueueItem] = monitor.make_queue("in_free_queue", DEFAULT_N_IN_ITEMS)
recv_data_shape = (
heaps_per_fengine_per_chunk,
n_ants,
n_channels_per_substream,
n_spectra_per_heap,
N_POLS,
COMPLEX,
)
for _ in range(DEFAULT_N_IN_ITEMS):
# TODO: NGC-1106 update buffer_device dtype once 4-bit mode is supported
buffer_device = accel.DeviceArray(context, recv_data_shape, dtype=np.int8)
present: NDArray[np.uint8] = np.zeros(shape=(self.heaps_per_fengine_per_chunk, n_ants), dtype=np.uint8)
in_item = InQueueItem(buffer_device, present)
self._in_free_queue.put_nowait(in_item)
for _ in range(n_free_chunks):
buf = buffer_device.empty_like()
present = np.zeros(n_ants * self.heaps_per_fengine_per_chunk, np.uint8)
chunk = recv.Chunk(data=buf, present=present, sink=self.receiver_stream)
chunk.recycle() # Make available to the stream
[docs]
def populate_sensors(self, sensors: aiokatcp.SensorSet, recv_sensor_timeout: float) -> None:
"""Define the sensors for an XBEngine."""
# Dynamic sensors
for sensor in base_recv.make_sensors(recv_sensor_timeout).values():
sensors.add(sensor)
[docs]
def free_in_item(self, item: InQueueItem) -> None:
"""Return an InQueueItem to the free queue if its refcount hits zero."""
item.refcount -= 1
if item.refcount == 0:
# All Pipelines are done with this item
self._active_in_sem.release()
# NOTE: Recycle the chunk only as resetting of the item is done
# when it is taken off the queue.
assert item.chunk is not None
item.chunk.recycle()
self._in_free_queue.put_nowait(item)
async def _add_in_item(self, item: InQueueItem) -> None:
"""Push an :class:`InQueueItem` to all the pipelines."""
await self._active_in_sem.acquire()
item.refcount = len(self._pipelines)
for pipeline in self._pipelines:
pipeline.add_in_item(item)
async def _receiver_loop(self) -> None:
"""
Receive heaps off of the network in a continuous loop.
This function does the following:
1. Wait for a chunk to be assembled on the receiver.
2. Get a free :class:`InQueueItem` off of the _in_free_queue.
3. Initiate the transfer of the chunk from system memory to the buffer
in GPU RAM that belongs to the in_item.
4. Place the in_item on _in_queue so that it can be processed downstream.
The above steps are performed in a loop until there are no more chunks to assembled.
"""
assert isinstance(self.receiver_stream.data_ringbuffer, ChunkRingbuffer)
async for chunk in recv.iter_chunks(
self.receiver_stream.data_ringbuffer,
self.recv_layout,
self.sensors,
self.time_converter,
):
# Get a free in_item that will contain the GPU buffer to which the
# received chunk will be transferred.
item = await self._in_free_queue.get()
# First wait for asynchronous GPU work on the buffer.
item.enqueue_wait_for_events(self._upload_command_queue)
item.reset()
# Now populate the fresh item
item.chunk = chunk
# Need a separate attribute as the chunk gets reset
item.timestamp = chunk.timestamp
# Need to reshape chunk.present to get Heaps in one dimension
item.present[:] = chunk.present.reshape(item.present.shape)
# Zero data affected by missing antennas
# TODO: NGC-1311 Update this once the region-based zeroing
# feature is implemented in katsdpsigproc.
for heap in range(self.heaps_per_fengine_per_chunk):
for antenna in range(self.n_ants):
if not item.present[heap, antenna]:
chunk.data[heap, antenna, ...] = 0
# Initiate transfer from received chunk to in_item buffer.
item.buffer_device.set_async(self._upload_command_queue, chunk.data)
item.add_marker(self._upload_command_queue)
# Give the received item to the pipelines' gpu_proc_loop.
await self._add_in_item(item)
# spead2 will (eventually) indicate that there are no chunks to async-for through
logger.debug("_receiver_loop completed")
for pipeline in self._pipelines:
pipeline.shutdown()
def _request_pipeline(self, stream_name: str) -> tuple[Pipeline, int]:
"""Get the pipeline related to the katcp request.
Return the first Pipeline that matches by name, as well as the index
of the data-stream it transmits.
Raises
------
FailReply
If the `stream_name` is not a known output.
"""
for pipeline in self._pipelines:
for i, output in enumerate(pipeline.outputs):
if stream_name == output.name:
return pipeline, i
raise aiokatcp.FailReply(f"No output stream called {stream_name!r}")
def _request_bpipeline(self, stream_name: str) -> tuple[BPipeline, int]:
"""Get the :class:`BPipeline` related to the katcp request.
This wraps :meth:`_request_bpipeline` to check that the requested
stream is a tied-array-channelised-voltage stream.
"""
pipeline, stream_id = self._request_pipeline(stream_name)
if not isinstance(pipeline, BPipeline):
raise aiokatcp.FailReply(f"Output {stream_name!r} is not a tied-array-channelised-voltage stream")
return pipeline, stream_id
[docs]
async def request_capture_start(self, ctx, stream_name: str, timestamp: int = 0) -> None:
"""Start transmission of stream.
Parameters
----------
stream_name
Output stream name.
timestamp
Minimum ADC timestamp at which to enable transmission.
"""
pipeline, stream_id = self._request_pipeline(stream_name)
pipeline.capture_enable(stream_id=stream_id, timestamp=timestamp)
[docs]
async def request_capture_stop(self, ctx, stream_name: str) -> None:
"""Stop transmission of a stream.
Parameters
----------
stream_name
Output stream name.
"""
pipeline, stream_id = self._request_pipeline(stream_name)
pipeline.capture_enable(stream_id=stream_id, enable=False)
[docs]
async def request_beam_weights(self, ctx, stream_name: str, *weights: float) -> None:
"""Set the weights for all inputs of a given beam and update the sensor.
Parameters
----------
stream_name
The beam to modify
weights
A sequence of real floating-point values (one per input).
"""
pipeline, stream_id = self._request_bpipeline(stream_name)
if len(weights) != self.n_ants:
raise aiokatcp.FailReply(f"Incorrect number of weights (expected {self.n_ants}, received {len(weights)})")
steady_state_timestamp = pipeline.set_weights(stream_id, np.array(weights))
self.sensors[f"{stream_name}.weight"].set_value(str(list(weights)), timestamp=steady_state_timestamp)
[docs]
async def request_beam_delays(self, ctx, stream_name: str, *delays: str) -> None:
"""Set the delays for all inputs of a given beam and update the sensor.
Parameters
----------
stream_name
The beam to modify
delays
A sequence of strings (one per input). Each string has the form
``delay:fringe-offset``, where ``delay`` is a delay in seconds, and
``fringe-offset`` is the net phase adjustment at the centre
frequency (of the whole stream, not of this engine).
"""
pipeline, stream_id = self._request_bpipeline(stream_name)
if len(delays) != self.n_ants:
raise aiokatcp.FailReply(f"Incorrect number of delays (expected {self.n_ants}, received {len(delays)})")
new_delays = np.empty((self.n_ants, 2), np.float64)
for i, entry in enumerate(delays):
delay_str, phase_str = entry.split(":")
new_delays[i, 0] = float(delay_str)
new_delays[i, 1] = float(phase_str)
steady_state_timestamp = pipeline.set_delays(stream_id, new_delays)
delays_formatted_str = ", ".join(str(value) for value in new_delays.flatten())
self.sensors[f"{stream_name}.delay"].set_value(
f"({steady_state_timestamp}, {delays_formatted_str})",
timestamp=steady_state_timestamp,
)
[docs]
async def request_beam_quant_gains(self, ctx, stream_name: str, gain: float) -> None:
"""Set the quantisation gain for a beam and update the sensor.
Parameters
----------
stream_name
The beam to modify.
gain
The new gain to apply.
"""
pipeline, stream_id = self._request_bpipeline(stream_name)
steady_state_timestamp = pipeline.set_quant_gain(stream_id, gain)
self.sensors[f"{stream_name}.quantiser-gain"].set_value(gain, timestamp=steady_state_timestamp)
[docs]
async def start(self, descriptor_interval_s: float = SPEAD_DESCRIPTOR_INTERVAL_S) -> None:
"""
Start the engine.
This function adds the receive, processing and transmit tasks onto the
event loop. It also adds a task to continuously send the descriptor
heap at an interval indicated by `descriptor_interval_s`.
These functions will loop forever and only exit once the XBEngine
receives a SIGINT or SIGTERM.
Parameters
----------
descriptor_interval_s
The interval used to dictate the 'engine sleep interval' between
sending the data descriptor.
"""
# Create the descriptor task first to ensure descriptor will be sent
# before any data makes its way through the pipeline.
for pipeline in self._pipelines:
# TODO: BPipeline needs to stagger the sending of descriptors
descriptor_sender = DescriptorSender(
pipeline.send_stream.stream,
pipeline.send_stream.descriptor_heap,
descriptor_interval_s,
substreams=range(pipeline.send_stream.stream.num_substreams),
)
descriptor_task = asyncio.create_task(
descriptor_sender.run(), name=f"{pipeline.name}.{DESCRIPTOR_TASK_NAME}"
)
self.add_service_task(descriptor_task, wait_on_stop=False)
base_recv.add_reader(
self.receiver_stream,
src=self._src,
interface=self._recv_interface,
ibv=self._recv_ibv,
comp_vector=self._recv_comp_vector,
buffer_size=self._recv_buffer,
)
self.add_service_task(
asyncio.create_task(self._receiver_loop(), name=RECV_TASK_NAME),
wait_on_stop=True,
)
for pipeline in self._pipelines:
proc_task = asyncio.create_task(
pipeline.gpu_proc_loop(),
name=f"{pipeline.name}.{GPU_PROC_TASK_NAME}",
)
self.add_service_task(proc_task, wait_on_stop=True)
send_task = asyncio.create_task(
pipeline.sender_loop(),
name=f"{pipeline.name}.{SEND_TASK_NAME}",
)
self.add_service_task(send_task, wait_on_stop=True)
await super().start()
[docs]
async def on_stop(self) -> None:
"""
Shut down processing when the device server is stopped.
This is called by aiokatcp after closing the listening socket.
"""
self.receiver_stream.stop()
await super().on_stop() # Waits for service tasks to complete
self._pipelines.clear() # Breaks circular references