Source code for katgpucbf.fgpu.send

################################################################################
# Copyright (c) 2020-2026, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Network transmission handling."""

import asyncio
import functools
from collections.abc import Callable, Sequence
from typing import Required, TypedDict, Unpack

import numpy as np
import spead2.send
import spead2.send.asyncio
from aiokatcp import SensorSet
from katsdptelstate.endpoint import Endpoint
from prometheus_client import Counter

from .. import COMPLEX, N_POLS
from ..send import send_rate
from ..spead import FENG_ID_ID, FENG_RAW_ID, FLAVOUR, FREQUENCY_ID, IMMEDIATE_DTYPE, IMMEDIATE_FORMAT, TIMESTAMP_ID
from ..utils import TimeConverter
from . import METRIC_NAMESPACE

#: Number of non-payload bytes per packet (header, 8 items pointers)
PREAMBLE_SIZE = 72
output_heaps_counter = Counter("output_heaps", "number of heaps transmitted", ["stream"], namespace=METRIC_NAMESPACE)
output_bytes_counter = Counter(
    "output_bytes", "number of payload bytes transmitted", ["stream"], namespace=METRIC_NAMESPACE
)
output_samples_counter = Counter(
    "output_samples", "number of complex samples transmitted", ["stream"], namespace=METRIC_NAMESPACE
)
skipped_heaps_counter = Counter(
    "output_skipped_heaps", "heaps not sent because input data was incomplete", ["stream"], namespace=METRIC_NAMESPACE
)
output_clip_counter = Counter(
    "output_clipped_samples", "number of samples that were saturated", ["stream", "pol"], namespace=METRIC_NAMESPACE
)


class _FengRawKwargs(TypedDict, total=False):
    """Helper class for type annotations."""

    shape: Required[tuple[int, ...]]
    dtype: np.dtype
    format: list[tuple[str, int]]


[docs] def make_item_group(**feng_raw_kwargs: Unpack[_FengRawKwargs]) -> spead2.send.ItemGroup: """Create an item group (without values). The `feng_raw_kwargs` must specify the shape and dtype/format for the ``feng_raw`` item. """ item_group = spead2.send.ItemGroup(flavour=FLAVOUR) item_group.add_item( TIMESTAMP_ID, "timestamp", "Timestamp provided by the MeerKAT digitisers and scaled to the digitiser sampling rate.", shape=(), format=IMMEDIATE_FORMAT, ) item_group.add_item( FENG_ID_ID, "feng_id", "Uniquely identifies the F-Engine source for the data.", shape=(), format=IMMEDIATE_FORMAT, ) item_group.add_item( FREQUENCY_ID, "frequency", "Identifies the first channel in the band of frequencies in the SPEAD heap.", shape=(), format=IMMEDIATE_FORMAT, ) item_group.add_item( FENG_RAW_ID, "feng_raw", "Channelised complex data from both polarisations of digitiser associated with F-Engine.", **feng_raw_kwargs, ) return item_group
[docs] class Batch: """Holds all the heaps for a single timestamp. It does not own its memory - the backing store is in :class:`Chunk`. Parameters ---------- timestamp Zero-dimensional array of dtype ``>u8`` holding the timestamp. data Payload data for the batch, of shape (channels, spectra_per_heap, N_POLS). saturated Saturation data for the batch, of shape (N_POLS,) feng_id Value to put in ``feng_id`` SPEAD item n_substreams Number of substreams into which the channels are divided """ def __init__( self, timestamp: np.ndarray, data: np.ndarray, saturated: np.ndarray, *, n_substreams: int, feng_id: int ) -> None: n_channels = data.shape[0] assert n_channels % n_substreams == 0 n_channels_per_substream = n_channels // n_substreams self.heaps = [] self.data = data self.saturated = saturated item_group = make_item_group(shape=(n_channels_per_substream,) + data.shape[1:], dtype=data.dtype) item_group[TIMESTAMP_ID].value = timestamp item_group[FENG_ID_ID].value = feng_id for i in range(n_substreams): start_channel = i * n_channels_per_substream heap_data = data[start_channel : start_channel + n_channels_per_substream] assert heap_data.flags.c_contiguous, "Heap data must be contiguous" item_group[FREQUENCY_ID].value = start_channel item_group[FENG_RAW_ID].value = heap_data heap = item_group.get_heap(data="all", descriptors="none") heap.repeat_pointers = True self.heaps.append(spead2.send.HeapReference(heap, substream_index=i))
def _multi_send( streams: list["spead2.send.asyncio.AsyncStream"], heaps: list[spead2.send.HeapReference] ) -> asyncio.Future: """Send a list of heaps across several streams. The list of heaps is broken into contiguous blocks, with each block sent to one stream. This returns a future, rather than being a coroutine. Thus, it should not be wrapped with :func:`asyncio.create_task`. """ if len(streams) == 1: # Most common case return streams[0].async_send_heaps(heaps, spead2.send.GroupMode.ROUND_ROBIN) else: futures = [] for i, stream in enumerate(streams): first = i * len(heaps) // len(streams) last = (i + 1) * len(heaps) // len(streams) futures.append(stream.async_send_heaps(heaps[first:last], spead2.send.GroupMode.ROUND_ROBIN)) return asyncio.gather(*futures)
[docs] class Chunk: """An array of batches, spanning multiple timestamps. Parameters ---------- data Storage for voltage data, with shape (n_batches, n_channels, n_spectra_per_heap, N_POLS) and a dtype returned by :func:`.gaussian_dtype`. saturated Storage for saturation counts, with shape (n_batches, N_POLS) and dtype uint32. n_substreams Number of substreams over which the data will be divided (must divide evenly into the number of channels). feng_id F-Engine ID to place in the SPEAD heaps spectra_samples Difference in timestamps between successive batches """ def __init__( self, data: np.ndarray, saturated: np.ndarray, *, n_substreams: int, feng_id: int, spectra_samples: int, ) -> None: n_batches = data.shape[0] n_channels = data.shape[1] n_spectra_per_heap = data.shape[2] if n_channels % n_substreams != 0: raise ValueError("n_substreams must divide into n_channels") self.data = data self.saturated = saturated #: Whether each batch has valid data self.present = np.zeros(n_batches, dtype=bool) #: Timestamp of the first heap self._timestamp = 0 #: Callback to return the chunk to the appropriate queue self.cleanup: Callable[[], None] | None = None self._timestamp_step = n_spectra_per_heap * spectra_samples #: Storage for timestamps in the SPEAD heaps. self._timestamps = (np.arange(n_batches) * self._timestamp_step).astype(IMMEDIATE_DTYPE) # The ... in indexing causes numpy to give a 0d array view, rather than # a scalar. self._batches = [ Batch(self._timestamps[i, ...], data[i], saturated[i], feng_id=feng_id, n_substreams=n_substreams) for i in range(n_batches) ] @property def timestamp(self) -> int: """Timestamp of the first heap. Setting this property updates the timestamps stored in all the heaps. This should not be done while a previous call to :meth:`send` is still in progress. """ return self._timestamp @timestamp.setter def timestamp(self, value: int) -> None: delta = value - self._timestamp self._timestamps += delta self._timestamp = value @staticmethod def _inc_counters(batch: Batch, output_name: str, future: asyncio.Future) -> None: if not future.cancelled() and future.exception() is None: output_heaps_counter.labels(output_name).inc(len(batch.heaps)) output_bytes_counter.labels(output_name).inc(batch.data.nbytes) output_samples_counter.labels(output_name).inc(batch.data.size) for pol in range(N_POLS): output_clip_counter.labels(output_name, pol).inc(batch.saturated[pol])
[docs] async def send( self, streams: list["spead2.send.asyncio.AsyncStream"], batches: int, time_converter: TimeConverter, sensors: SensorSet, output_name: str, ) -> None: """Transmit heaps over SPEAD streams. Batches from 0 to `batches` - 1 are sent asynchronously. The contents of each batch are distributed over the streams. If the number of streams does not divide into the number of destination endpoints, there will be imbalances, because the partitioning is the same for every batch. """ futures = [] saturated = [0] * N_POLS for present, batch in zip(self.present[:batches], self._batches[:batches], strict=True): if present: futures.append(_multi_send(streams, batch.heaps)) futures[-1].add_done_callback(functools.partial(self._inc_counters, batch, output_name)) for pol in range(N_POLS): # Cast np.uint32 to Python int to avoid overflow saturated[pol] += int(batch.saturated[pol]) else: skipped_heaps_counter.labels(output_name).inc(len(batch.heaps)) if futures: await asyncio.gather(*futures) end_timestamp = self._timestamp + self._timestamp_step * len(self._batches) end_time = time_converter.adc_to_unix(end_timestamp) for pol in range(N_POLS): sensor = sensors[f"{output_name}.input{pol}.feng-clip-cnt"] sensor.set_value(sensor.value + saturated[pol], timestamp=end_time)
[docs] def make_streams( *, output_name: str, thread_pool: spead2.ThreadPool, endpoints: list[Endpoint], interfaces: list[str], ttl: int, ibv: bool, packet_payload: int, comp_vector: int, buffer_size: int, bandwidth: float, send_rate_factor: float, feng_id: int, n_ants: int, n_data_heaps: int, chunks: Sequence[Chunk], ) -> list["spead2.send.asyncio.AsyncStream"]: """Create asynchronous SPEAD streams for transmission. Each stream is configured with substreams for all the end-points. They differ only in the network interface used (there is one per interface). Thus, they can be used interchangeably for load-balancing purposes. """ dtype = chunks[0].data.dtype # Type for each complex value memory_regions: list[object] = [chunk.data for chunk in chunks] heap_payload = sum(chunk.data.nbytes for chunk in chunks) // n_data_heaps # Work backwards from payload byte rate to get heap payload rate heap_interval = heap_payload / (N_POLS * bandwidth * dtype.itemsize) config = spead2.send.StreamConfig( rate=send_rate( packet_header=PREAMBLE_SIZE, packet_payload=packet_payload, heap_payload=heap_payload, heap_interval=heap_interval, send_rate_factor=send_rate_factor, ) / len(interfaces), max_packet_size=packet_payload + PREAMBLE_SIZE, # Adding len(endpoints) to accommodate descriptors sent for each substream max_heaps=n_data_heaps + len(endpoints), ) streams: list[spead2.send.asyncio.AsyncStream] if ibv: ibv_configs = [ spead2.send.UdpIbvConfig( endpoints=[(ep.host, ep.port) for ep in endpoints], interface_address=interface, ttl=ttl, comp_vector=comp_vector, memory_regions=memory_regions, buffer_size=buffer_size // len(interfaces), ) for interface in interfaces ] streams = [spead2.send.asyncio.UdpIbvStream(thread_pool, config, ibv_config) for ibv_config in ibv_configs] else: streams = [ spead2.send.asyncio.UdpStream( thread_pool, [(ep.host, ep.port) for ep in endpoints], config, ttl=ttl, interface_address=interface, buffer_size=buffer_size // len(interfaces), ) for interface in interfaces ] for i, stream in enumerate(streams): # Ensure that streams do not interfere with each other or with those of # other F-engines. This assumes that there are at most 256 # interfaces. IDs may get reused after 2^40/n_ants heaps, which should # be much larger than a receiver's window. stream.set_cnt_sequence((i << 40) + feng_id, n_ants) # Referencing the labels causes them to be created, in advance of data # actually being transmitted. output_heaps_counter.labels(output_name) output_bytes_counter.labels(output_name) output_samples_counter.labels(output_name) skipped_heaps_counter.labels(output_name) for pol in range(N_POLS): output_clip_counter.labels(output_name, pol) return streams
[docs] def make_descriptor_heap( *, channels_per_substream: int, spectra_per_heap: int, sample_bits: int, ) -> spead2.send.Heap: """Create a descriptor heap for output F-Engine data.""" raw_kwargs: _FengRawKwargs = {"shape": (channels_per_substream, spectra_per_heap, N_POLS, COMPLEX)} try: raw_kwargs["dtype"] = np.dtype(f"int{sample_bits}") except TypeError: # The number of bits doesn't neatly fit a numpy dtype raw_kwargs["format"] = [("i", sample_bits)] item_group = make_item_group(**raw_kwargs) return item_group.get_heap(descriptors="all", data="none")