Source code for katgpucbf.dsim.send

################################################################################
# Copyright (c) 2021-2026, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Transmission of SPEAD data."""

import asyncio
import functools
import ipaddress
import itertools
import time
from collections.abc import Iterable, Sequence
from typing import Self

import numpy as np
import spead2.send.asyncio
import xarray as xr
from prometheus_client import Counter, Gauge

from .. import BYTE_BITS, spead
from ..utils import TimeConverter
from . import METRIC_NAMESPACE
from .shared_array import SharedArray

output_heaps_counter = Counter("output_heaps", "number of heaps transmitted", namespace=METRIC_NAMESPACE)
output_bytes_counter = Counter("output_bytes", "number of payload bytes transmitted", namespace=METRIC_NAMESPACE)
time_error_gauge = Gauge("time_error_s", "elapsed time minus expected elapsed time", namespace=METRIC_NAMESPACE)


[docs] class HeapSet: """Collection of heaps making up a signal. The heaps are split into two parts, each of which is preprocessed to allow efficient transmission. This class should normally be constructed with :meth:`create`. Parameters ---------- data An xarray data set with the following variables: timestamps 1D array of timestamps, big-endian 64-bit digitiser_status 2D array of digitiser status values, big-endian 64-bit (indexed by polarisation and time) payload 2D array of raw sample data (indexed by polarisation and time) heaps Heaps referencing the timestamps and payload The dimensions must be ``time``, ``pol`` and ``data``. """ def __init__(self, data: xr.Dataset) -> None: if data.sizes["time"] < 2: raise ValueError("time dimension must have at least 2 elements") middle = data.sizes["time"] // 2 self.data = data self.parts = [data.isel(time=np.s_[:middle]), data.isel(time=np.s_[middle:])] for part in self.parts: part.attrs["heap_reference_list"] = spead2.send.HeapReferenceList(part["heaps"].data.ravel().tolist())
[docs] @classmethod def create( cls, timestamps: np.ndarray, n_substreams: Sequence[int], heap_size: int, digitiser_id: Sequence[int] ) -> Self: """ Create from shape parameters. Parameters ---------- timestamps The timestamp array to associate with the :class:`HeapSet` (must be big-endian 64-bit). n_substreams Number of substreams to distribute the heaps across, per polarisation heap_size Number of bytes of payload per heap digitiser_id Digitiser ID to insert into the packets, per polarisation (LSB should indicate polarisation) """ assert len(n_substreams) == len(digitiser_id) n_pols = len(n_substreams) # TODO: make sure that this uses huge pages, as that is more # efficient for ibverbs. n = len(timestamps) shared_payload = SharedArray.create("dsim_payload", (n_pols, n, heap_size), np.uint8) payload = shared_payload.buffer heaps = [] substream_offset = list(itertools.accumulate(n_substreams, initial=0)) digitiser_id_items = [spead.make_immediate(spead.DIGITISER_ID_ID, dig_id) for dig_id in digitiser_id] digitiser_status = np.zeros((n_pols, n), dtype=spead.IMMEDIATE_DTYPE) for i in range(n): # The ... in indexing causes numpy to give a 0d array view, rather than # a scalar. heap_timestamp = timestamps[i, ...] cur_heaps = [] timestamp_item = spead.make_immediate(spead.TIMESTAMP_ID, heap_timestamp) for j in range(n_pols): heap_status = digitiser_status[j, i, ...] digitiser_status_item = spead.make_immediate(spead.DIGITISER_STATUS_ID, heap_status) heap_payload = payload[j, i] heap = spead2.send.Heap(spead.FLAVOUR) heap.add_item(timestamp_item) heap.add_item(digitiser_id_items[j]) heap.add_item(digitiser_status_item) heap.add_item( spead2.Item( spead.ADC_SAMPLES_ID, "", "", shape=heap_payload.shape, dtype=heap_payload.dtype, value=heap_payload, ) ) heap.repeat_pointers = True substream_index = substream_offset[j] + i % n_substreams[j] cur_heaps.append(spead2.send.HeapReference(heap, substream_index=substream_index)) heaps.append(cur_heaps) data = xr.Dataset( { "timestamps": (["time"], timestamps), "payload": (["pol", "time", "data"], payload, {"shared_array": shared_payload}), "heaps": (["time", "pol"], heaps), "digitiser_status": (["pol", "time"], digitiser_status), } ) return cls(data)
def _is_multicast(address: str) -> bool: """Determine whether an address is a multicast address. This makes the guess that anything that doesn't parse as an IP address is a DNS name and that DNS names will resolve to unicast addresses. """ try: return ipaddress.ip_address(address).is_multicast except ValueError: return False
[docs] def make_stream_base( *, config: spead2.send.StreamConfig, endpoints: Iterable[tuple[str, int]], ttl: int, interface_address: str, ibv: bool = False, affinity: int = -1, comp_vector: int = 0, buffer_size: int, memory_regions: list | None = None, ) -> "spead2.send.asyncio.AsyncStream": """Create a spead2 stream for sending. This is the low-level support for making either a data or a descriptor stream. Refer to :func:`make_stream` for explanations of the arguments. """ endpoints_list = list(endpoints) thread_pool = spead2.ThreadPool(1, [] if affinity < 0 else [affinity]) if ibv: ibv_config = spead2.send.UdpIbvConfig( endpoints=endpoints_list, interface_address=interface_address, ttl=ttl, comp_vector=comp_vector, ) if memory_regions is not None: ibv_config.memory_regions = memory_regions return spead2.send.asyncio.UdpIbvStream(thread_pool, config, ibv_config) elif any(_is_multicast(endpoint[0]) for endpoint in endpoints_list): return spead2.send.asyncio.UdpStream( thread_pool, endpoints_list, config, ttl=ttl, interface_address=interface_address, buffer_size=buffer_size ) else: return spead2.send.asyncio.UdpStream(thread_pool, endpoints_list, config)
[docs] def make_stream( *, endpoints: Iterable[tuple[str, int]], heap_sets: Iterable[HeapSet], n_pols: int, adc_sample_rate: float, heap_samples: int, sample_bits: int, max_heaps: int, ttl: int, interface_address: str, ibv: bool, affinity: int, comp_vector: int, buffer_size: int, ) -> "spead2.send.asyncio.AsyncStream": """Create a spead2 stream for sending. Parameters ---------- endpoints Destinations (host and port) for all substreams n_pols Number of single-pol streams to send adc_sample_rate Sample rate for each single-pol stream, in Hz heap_samples Number of samples to send in each heap (each heap will be sent as a single packet) sample_bits Number of bits per sample max_heaps Maximum number of heaps that may be in flight at once ttl IP TTL field interface_address IP address of the interface from which to send the data ibv If true, use ibverbs for acceleration affinity If non-negative, bind the sending thread to this CPU core comp_vector Completion vector for ibverbs """ preamble = 72 # SPEAD header, 4 standard item pointers, 4 application-specific item pointers heap_size = heap_samples * sample_bits // BYTE_BITS overhead_ratio = (heap_size + preamble) / heap_size config = spead2.send.StreamConfig( rate=adc_sample_rate * n_pols * sample_bits / BYTE_BITS * overhead_ratio, max_packet_size=heap_size + preamble, max_heaps=max_heaps, ) return make_stream_base( config=config, endpoints=endpoints, ttl=ttl, interface_address=interface_address, ibv=ibv, affinity=affinity, comp_vector=comp_vector, buffer_size=buffer_size, memory_regions=[heap_set.data["payload"].data for heap_set in heap_sets], )
[docs] class Sender: """Manage sending packets.""" def __init__( self, stream: "spead2.send.asyncio.AsyncStream", heap_set: HeapSet, heap_samples: int, ) -> None: self.stream = stream self.heap_set = heap_set self.heap_samples = heap_samples self.time_converter = TimeConverter(0.0, 1.0) # Dummy value; run() will initialise # The futures serve two functions: # - prevent concurrent access to the timestamps while they're being sent # - limiting the amount of data in flight self._futures: list[asyncio.Future[int] | None] = [None] * len(heap_set.parts) self._running = True # Set to false to start shutdown self._finished = asyncio.Event() # First timestamp that we haven't yet submitted to async_send_heaps # (value is a dummy; real initial value is set by run) self._next_timestamp = 0
[docs] def halt(self) -> None: """Request :meth:`run` to stop, but do not wait for it.""" self._running = False
[docs] async def join(self) -> None: """Wait for :meth:`run` to finish. This does not cause it to stop: use :meth:`halt` for that. """ await self._finished.wait()
[docs] async def stop(self) -> None: """Stop :meth:`run` and wait for it to finish.""" self.halt() await self.join()
def _update_metrics(self, end_timestamp: int, heaps: int, bytes: int, _future: asyncio.Future) -> None: end_time = self.time_converter.adc_to_unix(end_timestamp) time_error_gauge.set(time.time() - end_time) output_heaps_counter.inc(heaps) output_bytes_counter.inc(bytes)
[docs] async def run(self, first_timestamp: int, time_converter: TimeConverter) -> None: """Send heaps continuously.""" self._next_timestamp = first_timestamp self.time_converter = time_converter # Prepare initial timestamps first_end_timestamp = first_timestamp + self.heap_set.data.sizes["time"] * self.heap_samples self.heap_set.data["timestamps"][:] = np.arange( first_timestamp, first_end_timestamp, self.heap_samples, dtype=spead.IMMEDIATE_DTYPE, ) while self._running: for i, part in enumerate(self.heap_set.parts): await asyncio.sleep(0) # ensure other tasks get time to run if self._futures[i] is not None: await asyncio.shield(self._futures[i]) # type: ignore # set_heaps may have swapped heap_set out from under us during # the await, so re-initialise part. part = self.heap_set.parts[i] part["timestamps"] += self.heap_set.data.sizes["time"] * self.heap_samples send_future = self.stream.async_send_heaps( part.attrs["heap_reference_list"], spead2.send.GroupMode.SERIAL ) self._futures[i] = send_future self._next_timestamp += part.sizes["time"] * self.heap_samples send_future.add_done_callback( functools.partial( self._update_metrics, self._next_timestamp, part["heaps"].size, part["payload"].nbytes ) ) for future in self._futures: if future is not None: await future self._finished.set() # Wake up join()
[docs] async def set_heaps(self, heap_set: HeapSet) -> int: """Switch out the heap set for a different one. This does not return until the payload of the previous :class:`HeapSet` is no longer in use (the timestamps may still be in use). The new heap_set must share timestamps with the old one. Returns ------- timestamp First timestamp which will use the new heap set """ if heap_set.data["timestamps"].data is not self.heap_set.data["timestamps"].data: raise ValueError("new heap set does not share timestamps with the old") old_futures = [] for future in self._futures: if future is not None: old_futures.append(future) self.heap_set = heap_set timestamp = self._next_timestamp if old_futures: await asyncio.wait(old_futures) return timestamp