Source code for katgpucbf.xbgpu.bsend

################################################################################
# Copyright (c) 2023-2026, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Module for sending tied array channelised voltage products onto the network."""

import asyncio
import functools
import logging
from collections.abc import Callable, Sequence
from typing import Final

import katsdpsigproc.accel as accel
import numpy as np
import spead2
import spead2.send.asyncio
from aiokatcp import SensorSet
from katsdpsigproc.abc import AbstractContext
from katsdptelstate.endpoint import Endpoint
from prometheus_client import Counter

from .. import COMPLEX, DEFAULT_PACKET_PAYLOAD_BYTES
from ..send import send_rate
from ..spead import BEAM_ANTS_ID, BF_RAW_ID, FLAVOUR, FREQUENCY_ID, IMMEDIATE_DTYPE, IMMEDIATE_FORMAT, TIMESTAMP_ID
from ..utils import TimeConverter
from . import METRIC_NAMESPACE
from .output import BOutput
from .send import Send

output_heaps_counter = Counter(
    "output_b_heaps", "number of B-engine heaps transmitted", ["stream"], namespace=METRIC_NAMESPACE
)
output_bytes_counter = Counter(
    "output_b_bytes", "number of B-engine payload bytes transmitted", ["stream"], namespace=METRIC_NAMESPACE
)
output_samples_counter = Counter(
    "output_b_samples", "number of complex beam samples transmitted", ["stream"], namespace=METRIC_NAMESPACE
)
output_clip_counter = Counter(
    "output_b_clipped_samples", "number of beam samples that were saturated", ["stream"], namespace=METRIC_NAMESPACE
)

logger = logging.getLogger(__name__)
# NOTE: ICD suggests `beng_out_bits_per_sample`,
# MK correlator doesn't make this configurable.
SEND_DTYPE = np.dtype(np.int8)


[docs] def make_item_group(bf_raw_shape: tuple[int, ...]) -> spead2.send.ItemGroup: """Create an item group (with no values).""" item_group = spead2.send.ItemGroup(flavour=FLAVOUR) item_group.add_item( TIMESTAMP_ID, "timestamp", "Timestamp provided by the MeerKAT digitisers and scaled to the digitiser sampling rate.", shape=[], format=IMMEDIATE_FORMAT, ) item_group.add_item( FREQUENCY_ID, "frequency", # Misleading name, but it's what the ICD specifies "Value of the first channel in collections stored here.", shape=[], format=IMMEDIATE_FORMAT, ) item_group.add_item( BEAM_ANTS_ID, "beam_ants", "Count of antennas included in the beam sum.", shape=[], format=IMMEDIATE_FORMAT, ) item_group.add_item( BF_RAW_ID, "bf_raw", "Beamformer output for frequency-domain beam.", shape=bf_raw_shape, dtype=SEND_DTYPE, ) return item_group
[docs] class Batch: """Hold all data for heaps with a single timestamp. It does not own its memory - the backing store is in :class:`Chunk`. It keeps a cached :class:`spead2.send.HeapReferenceList` with the heaps of the enabled beams, along with a tuple of the enabled beams. Parameters ---------- timestamp Zero-dimensional array of dtype ``>u8`` holding the timestamp data Payload data for the batch with shape (n_beams, n_channels_per_substream, spectra_per_heap, COMPLEX). channel_offset The first frequency channel processed. present_ants Zero-dimensional array of dtype ``>u8`` holding the number of antennas present in the Batch's input data. """ def __init__( self, timestamp: np.ndarray, data: np.ndarray, *, channel_offset: int, present_ants: np.ndarray, ) -> None: self.heaps: list[spead2.send.Heap] = [] self.data = data n_beams = data.shape[0] item_group = make_item_group(data.shape[1:]) # Get rid of the 'beam' dimension item_group[FREQUENCY_ID].value = channel_offset item_group[TIMESTAMP_ID].value = timestamp item_group[BEAM_ANTS_ID].value = present_ants for i in range(n_beams): item_group[BF_RAW_ID].value = self.data[i, ...] heap = item_group.get_heap(descriptors="none", data="all") heap.repeat_pointers = True self.heaps.append(heap) self.send_enabled = (False,) * n_beams self.send_heaps = spead2.send.HeapReferenceList([])
[docs] class Chunk: r""" An array of :class:`Batch`\ es. Parameters ---------- data Storage for tied-array-channelised-voltage data, with shape (n_batches, n_beams, n_channels_per_substream, n_spectra_per_heap, COMPLEX) and dtype :const:`SEND_DTYPE`. saturated Storage for saturation counts, with shape (n_beams,) and dtype uint32. channel_offset The first frequency channel processed. timestamp_step Timestamp step between successive :class:`Batch`\ es in a chunk. """ def __init__( self, data: np.ndarray, saturated: np.ndarray, *, channel_offset: int, timestamp_step: int, ) -> None: n_batches = data.shape[0] self.data = data self.saturated = saturated self._timestamp = 0 self._timestamp_step = timestamp_step self._timestamps = (np.arange(n_batches) * self._timestamp_step).astype(IMMEDIATE_DTYPE) self._present_ants = np.zeros(shape=(n_batches,), dtype=IMMEDIATE_DTYPE) # NOTE: The future indicates when it is safe to modify the chunk, # i.e. it is not being transmitted. At construction there is nothing to # wait for, so we mark it ready. self.future = asyncio.get_running_loop().create_future() self.future.set_result(None) self._batches = [ Batch( self._timestamps[i, ...], data[i], channel_offset=channel_offset, present_ants=self._present_ants[i, ...], ) for i in range(n_batches) ] @property def present_ants(self) -> np.ndarray: """ Number of antennas present in the current beam sums. This is a count for each :class:`Batch` in the chunk. Setting this property updates the immediate SPEAD items in the heaps. Much like :attr:`timestamp`, this should only be done when :attr:`future` is done. """ return self._present_ants @present_ants.setter def present_ants(self, value: np.ndarray) -> None: self._present_ants[:] = value @property def timestamp(self) -> int: """ Timestamp of the first heap. Setting this property updates the timestamps stored in all the heaps. This should only be done when :attr:`future` is done. """ return self._timestamp @timestamp.setter def timestamp(self, value: int) -> None: delta = value - self.timestamp self._timestamps += delta self._timestamp = value @staticmethod def _inc_counters( n_batches_sent: int, data_shape: tuple[int, int, int], data_dtype: np.dtype, enabled: Sequence[bool], output_names: Sequence[str], saturated: np.ndarray, sensors: SensorSet, sensor_timestamp: float, future: asyncio.Future, ) -> None: """Increment beam stream Prometheus counters. Intended to be used on a gathered set of futures as it is computationally expensive to increment Prometheus counters for each call to async_send_heaps. Parameters ---------- n_batches_sent The number of batches transmitted. data_shape The shape of the beam data being transmitted. Expected in the format of (n_channels_per_substream, samples_per_spectra, COMPLEX). data_dtype The `np.dtype` of the beam data transmitted. enabled Boolean flag array indicating which streams are enabled for transmission. output_names List of beam stream names saturated Saturation count for the chunk for each stream in `output_names`. sensors Server's katcp sensors. sensor_timestamp Timestamp (UNIX time) to use for sensor update. future Future returned by the spead2 stream's `async_send_heaps`. """ if future.cancelled() or future.exception() is not None: # Don't update output counters if we didn't successfully transmit the data. n_batches_sent = 0 # int casts are because np.prod returns np.int64 which is # incompatible with the type annotations for Prometheus. # Multiply across dimensions to get total bytes byte_count = int(np.prod(data_shape)) * data_dtype.itemsize * n_batches_sent # Multiply across the first two dimensions to get complex sample count sample_count = int(np.prod(data_shape[:-1])) * n_batches_sent for i, output_name in enumerate(output_names): clipped = int(saturated[i]) sensor = sensors[f"{output_name}.beng-clip-cnt"] sensor.set_value(sensor.value + clipped, timestamp=sensor_timestamp) if enabled[i] and n_batches_sent != 0: output_heaps_counter.labels(output_name).inc(n_batches_sent) output_bytes_counter.labels(output_name).inc(byte_count) output_samples_counter.labels(output_name).inc(sample_count) output_clip_counter.labels(output_name).inc(clipped)
[docs] def send( self, send_stream: "BSend", time_converter: TimeConverter, sensors: SensorSet, ) -> asyncio.Future: """ Transmit a chunk's heaps over a SPEAD stream. This method returns immediately and sends the data asynchronously. Before modifying the chunk, first await :attr:`future`. """ send_enabled = tuple( enabled and self.timestamp >= timestamp for enabled, timestamp in zip(send_stream.send_enabled, send_stream.send_enabled_timestamp, strict=True) ) n_enabled = sum(send_enabled) end_timestamp_adc = self._timestamp + self._timestamp_step * len(self._batches) timestamp_unix = time_converter.adc_to_unix(self._timestamp) # Update tx.next-timestamp immediately rather than in _inc_counters so # that it's synchronised with reading send_enabled. for output_name in send_stream.output_names: sensors[f"{output_name}.tx.next-timestamp"].set_value(end_timestamp_adc, timestamp=timestamp_unix) rate = send_stream.bytes_per_second_per_beam * n_enabled send_futures: list[asyncio.Future] = [] if n_enabled > 0: for batch, antenna_presence in zip(self._batches, self._present_ants, strict=True): if antenna_presence == 0: # No antennas were present in the received batch of heaps # This check takes priority as we do not transmit batches # that did not have any input data. The updating of the # batch's :class:`HeapReferenceList` is not time-critical. continue if batch.send_enabled != send_enabled: batch.send_heaps = spead2.send.HeapReferenceList( [ spead2.send.HeapReference(heap, substream_index=i, rate=rate) for i, (heap, enabled) in enumerate(zip(batch.heaps, send_enabled, strict=True)) if enabled ] ) batch.send_enabled = send_enabled send_futures.append( send_stream.stream.async_send_heaps(batch.send_heaps, mode=spead2.send.GroupMode.ROUND_ROBIN) ) self.future = asyncio.gather(*send_futures) else: # TODO: Is it necessary to handle this case? self.future = asyncio.create_task(send_stream.stream.async_flush()) end_timestamp_unix = time_converter.adc_to_unix(end_timestamp_adc) self.future.add_done_callback( functools.partial( self._inc_counters, len(send_futures), # Increment counters for as many calls to async_send_heaps # Get rid of 'batch' and 'beam' dimensions (ignore is because # mypy doesn't know the number of dimensions) self.data.shape[2:], # type: ignore[arg-type] self.data.dtype, send_enabled, send_stream.output_names, self.saturated.copy(), # Copy since the original may get overwritten sensors, end_timestamp_unix, ) ) return self.future
[docs] class BSend(Send): r""" Class for turning tied array channelised voltage products into SPEAD heaps. This class creates a queue of chunks that can be sent out onto the network. To obtain a chunk, call :meth:`get_free_chunk` - which will return a :class:`Chunk`. This object will create a limited number of transmit buffers and keep recycling them, avoiding any memory allocation at runtime. The transmission of a chunk's data is abstracted by :meth:`send_chunk`. This invokes transmission and immediately returns the :class:`Chunk` back to the queue for reuse. This object keeps track of each tied-array-channelised-voltage data stream by means of substreams in :class:`spead2.send.asyncio.AsyncStream`, allowing for individual enabling and disabling of the data product. To allow this class to be used with multiple transports, the constructor takes a factory function to create the stream. Parameters ---------- outputs Sequence of :class:`.output.BOutput`. batches_per_chunk Number of :class:`Batch`\ es in each transmitted :class:`Chunk`. n_chunks Number of :class:`Chunk`\ s to create. adc_sample_rate, n_channels, n_channels_per_substream, spectra_per_heap, channel_offset See :class:`.XBEngine` for further information. timestamp_step The timestamp step between successive heaps. send_rate_factor Factor dictating how fast the send-stream should transmit data. context Device context to create buffers. stream_factory Callback function to create the spead2 send stream. It is passed the stream configuration and memory buffers. packet_payload Size, in bytes, for the output packets (tied array channelised voltage payload only; headers and padding are added to this). send_enabled Enable/Disable transmission. """ descriptor_heap: spead2.send.Heap preamble_size: Final[int] = 72 def __init__( self, outputs: Sequence[BOutput], batches_per_chunk: int, n_chunks: int, n_channels: int, n_channels_per_substream: int, spectra_per_heap: int, adc_sample_rate: float, timestamp_step: int, send_rate_factor: float, channel_offset: int, context: AbstractContext, stream_factory: Callable[[spead2.send.StreamConfig, Sequence[np.ndarray]], "spead2.send.asyncio.AsyncStream"], packet_payload: int = DEFAULT_PACKET_PAYLOAD_BYTES, send_enabled: bool = False, ) -> None: self.send_enabled = [send_enabled] * len(outputs) self.send_enabled_timestamp = [0] * len(outputs) n_beams = len(outputs) self.output_names = [output.name for output in outputs] self._chunks_queue: asyncio.Queue[Chunk] = asyncio.Queue() buffers: list[np.ndarray] = [] send_shape = (batches_per_chunk, n_beams, n_channels_per_substream, spectra_per_heap, COMPLEX) for _ in range(n_chunks): chunk = Chunk( accel.HostArray(send_shape, SEND_DTYPE, context=context), accel.HostArray((n_beams,), np.uint32, context=context), channel_offset=channel_offset, timestamp_step=timestamp_step, ) self._chunks_queue.put_nowait(chunk) buffers.append(chunk.data) heap_payload_size_bytes = n_channels_per_substream * spectra_per_heap * COMPLEX * SEND_DTYPE.itemsize self.bytes_per_second_per_beam = send_rate( packet_header=BSend.preamble_size, packet_payload=packet_payload, heap_payload=heap_payload_size_bytes, heap_interval=timestamp_step / adc_sample_rate, send_rate_factor=send_rate_factor, ) stream_config = spead2.send.StreamConfig( max_packet_size=packet_payload + BSend.preamble_size, # + 1 below for the descriptor per beam max_heaps=(n_chunks * batches_per_chunk + 1) * n_beams, rate_method=spead2.send.RateMethod.AUTO, ) item_group = make_item_group(buffers[0].shape[2:]) super().__init__( n_channels=n_channels, n_channels_per_substream=n_channels_per_substream, channel_offset=channel_offset, stream=stream_factory(stream_config, buffers), descriptor_heap=item_group.get_heap(descriptors="all", data="none"), )
[docs] def enable_beam(self, beam_id: int, enable: bool = True, timestamp: int = 0) -> None: """Enable/Disable a beam's data transmission. :class:`.BSend` operates as a single, large stream with multiple substreams. Each substream (beam) is its own data product and is required to be enabled/disabled independently. Parameters ---------- beam_id Index of the beam's data product. enable Boolean indicating whether the `beam_id` should be enabled or disabled. timestamp Minimum timestamp to transmit when enabled. """ self.send_enabled[beam_id] = enable self.send_enabled_timestamp[beam_id] = timestamp
[docs] async def get_free_chunk(self) -> Chunk: """Obtain a :class:`.Chunk` for transmission. We await the chunk's :attr:`future` to be sure we are not overwriting data that is still being transmitted. If sending failed, it is no longer being transmitted, and therefore safe to return the chunk. Raises ------ asyncio.CancelledError If the chunk's send future is cancelled. """ chunk = await self._chunks_queue.get() try: await chunk.future except asyncio.CancelledError: raise except Exception: logger.exception("Error sending chunk") return chunk
[docs] def send_chunk(self, chunk: Chunk, time_converter: TimeConverter, sensors: SensorSet) -> None: """Send a chunk's data and put it on the :attr:`_chunks_queue`.""" chunk.send(self, time_converter, sensors) self._chunks_queue.put_nowait(chunk)
[docs] async def send_stop_heap(self) -> None: """Send a Stop Heap over the spead2 transport.""" stop_heap = spead2.send.Heap(FLAVOUR) stop_heap.add_end() # Flush just to ensure that we don't overflow the stream's queue. # It's a heavy-handed approach, but we don't care about performance # during shutdown. await self.stream.async_flush() for i in range(len(self.output_names)): await self.stream.async_send_heap(stop_heap, substream_index=i)
[docs] def make_stream( *, output_names: list[str], endpoints: list[Endpoint], interface: str, ttl: int, use_ibv: bool, affinity: int, buffer_size: int, comp_vector: int, stream_config: spead2.send.StreamConfig, buffers: Sequence[np.ndarray], ) -> "spead2.send.asyncio.AsyncStream": """Create asynchronous SPEAD stream for transmission. This is architected to be a single send stream with multiple substreams, each corresponding to a tied-array-channelised-voltage output data product. The `endpoints` need not be a contiguous list of multicast addresses. """ stream: spead2.send.asyncio.AsyncStream thread_pool = spead2.ThreadPool(1, [] if affinity < 0 else [affinity]) if use_ibv: stream = spead2.send.asyncio.UdpIbvStream( thread_pool, stream_config, spead2.send.UdpIbvConfig( endpoints=[(ep.host, ep.port) for ep in endpoints], interface_address=interface, ttl=ttl, comp_vector=comp_vector, memory_regions=list(buffers), ), ) else: stream = spead2.send.asyncio.UdpStream( thread_pool, [(ep.host, ep.port) for ep in endpoints], stream_config, interface_address=interface, ttl=ttl, buffer_size=buffer_size, ) # Referencing the labels causes them to be created, in advance of data # actually being transmitted. for output_name in output_names: output_heaps_counter.labels(output_name) output_bytes_counter.labels(output_name) output_samples_counter.labels(output_name) output_clip_counter.labels(output_name) return stream