Source code for katgpucbf.recv

################################################################################
# Copyright (c) 2021-2026, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Shared utilities for receiving SPEAD data."""

import ctypes
import dataclasses
import logging
import time
import weakref
from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator, Iterable, Mapping, Sequence
from dataclasses import dataclass
from typing import Any, ClassVar, Self

import aiokatcp
import numba.core.ccallback
import numpy as np
import scipy
import spead2.recv
import spead2.recv.asyncio
from numba import types
from prometheus_client import REGISTRY, CollectorRegistry, Counter, Metric
from prometheus_client.core import CounterMetricFamily
from prometheus_client.registry import Collector

from .utils import DeviceStatusSensor, TimeConverter, TimeoutSensorStatusObserver, make_rate_limited_sensor

logger = logging.getLogger(__name__)
user_data_type = types.Record.make_c_struct(
    [
        ("stats_base", types.uintp),  # Index for first custom statistic
    ]
)

#: Number of chunks before rx sensor status changes
RECV_SENSOR_TIMEOUT_CHUNKS = 10
#: Minimum recv sensor status timeout in seconds
RECV_SENSOR_TIMEOUT_MIN = 1.0
#: Eviction mode to use when some streams fall behind
EVICTION_MODE = spead2.recv.ChunkStreamGroupConfig.EvictionMode.LOSSY

type AnyStream = spead2.recv.ChunkRingStream | spead2.recv.ChunkStreamGroupMember


[docs] class Chunk(spead2.recv.Chunk): """Collection of heaps passed to the GPU at one time. It extends the spead2 base class to store a timestamp (computed from the chunk ID when the chunk is received), and optionally store a vkgdr device array. When used as a context manager, it will call :meth:`recycle` on exit. """ # Refine the types used in the base class present: np.ndarray data: np.ndarray extra: np.ndarray | None # New fields device: object timestamp: int sink: weakref.ref def __init__(self, *args, sink: spead2.recv.ChunkRingPair, device: object = None, **kwargs): super().__init__(*args, **kwargs) self.device = device self.timestamp = 0 # Actual value filled in when chunk received self.sink = weakref.ref(sink) def __enter__(self) -> Self: return self def __exit__(self, *exc_info: object) -> None: self.recycle()
[docs] def recycle(self) -> None: """Return the chunk to the owning stream/group.""" sink = self.sink() # If it is None, the sink has been garbage collected, and there is no # need to return the chunk. if sink is not None: sink.add_free_chunk(self)
[docs] class StatsCollector(Collector): """Collect statistics from spead2 streams as Prometheus metrics.""" class _GroupMemberWeakref: """Weak reference to a member of a stream group. This is complicated by the way spead2 exposes stream group members: accessing a group member returns a freshly-minted Python wrapper, which will disappear even if the group member itself still exists in C++. That makes :class:`spead2.recv.ChunkStreamGroupMember` unsuitable for use with weakrefs. Instead, we hold a weakref to the group. """ def __init__(self, stream_group: spead2.recv.ChunkStreamRingGroup, index: int) -> None: self._stream_group = weakref.ref(stream_group) self._index = index def __call__(self) -> spead2.recv.ChunkStreamGroupMember | None: """Obtain a strong reference to the stream, if it still exists.""" stream_group = self._stream_group() if stream_group is None: return None else: return stream_group[self._index] @dataclass class _StreamInfo: """Information about a single registered stream.""" stream: "weakref.ReferenceType[spead2.recv.ChunkRingStream] | StatsCollector._GroupMemberWeakref" indices: list[int] # Indices of counters, in the order given by counter_map prev: list[int] # Amounts already counted class _LabelSet: """Information shared by all streams with the same set of labels.""" labels: tuple[str, ...] totals: dict[str, int] # sum over all streams, indexed by stat name created: float # time of creation streams: list["StatsCollector._StreamInfo"] def __init__(self, labels: tuple[str, ...], stat_names: Iterable[str]) -> None: self.labels = labels self.totals = {stat_name: 0 for stat_name in stat_names} self.created = time.time() self.streams = [] def _add_stream( self, stream_weak: "weakref.ReferenceType[spead2.recv.ChunkRingStream] | StatsCollector._GroupMemberWeakref" ) -> None: stream = stream_weak() assert stream is not None # Caller guarantees it holds a strong reference config = stream.config indices = [config.get_stat_index(name) for name in self.totals.keys()] # Get the current statistics and immediately update with them stats = stream.stats prev = [] for i, stat_name in enumerate(self.totals.keys()): cur = stats[indices[i]] self.totals[stat_name] += cur prev.append(cur) self.streams.append(StatsCollector._StreamInfo(stream_weak, indices, prev)) def add_stream(self, stream: spead2.recv.ChunkRingStream) -> None: """Register a new stream.""" self._add_stream(weakref.ref(stream)) def add_stream_group(self, stream_group: spead2.recv.ChunkStreamRingGroup) -> None: for i in range(len(stream_group)): self._add_stream(StatsCollector._GroupMemberWeakref(stream_group, i)) def update(self) -> None: """Fetch statistics from all streams and update totals.""" # We build a new copy of the streams list which excludes any that # were garbage collected. new_streams = [] for stream_info in self.streams: stream = stream_info.stream() if stream is not None: new_streams.append(stream_info) stats = stream.stats for i, stat_name in enumerate(self.totals.keys()): cur = stats[stream_info.indices[i]] self.totals[stat_name] += cur - stream_info.prev[i] stream_info.prev[i] = cur self.streams = new_streams @staticmethod def _build_full_name(namespace: str, name: str) -> str: """Combine the (optional) namespace with the name.""" full_name = "" if namespace: full_name += namespace + "_" full_name += name return full_name def __init__( self, counter_map: Mapping[str, tuple[str, str]], labelnames: Iterable[str] = (), namespace: str = "", registry: CollectorRegistry = REGISTRY, ) -> None: self._counter_map = { stat_name: (self._build_full_name(namespace, name), description) for stat_name, (name, description) in counter_map.items() } self._labelnames = tuple(labelnames) self._label_sets: dict[tuple[str, ...], StatsCollector._LabelSet] = {} # If there are no labels, then we can instantiate the counters # immediately since we know their full names. if self._labelnames == (): self.add_label_set(()) if registry: registry.register(self)
[docs] def add_label_set(self, labels: Iterable[str]) -> tuple[str, ...]: """Ensure that statistics exist for a given label set. This is automatically done by :meth:`add_stream` and :meth:`add_stream_group`. It may be useful if the streams will only be added later but the statistics are required now. It is a no-op to call this function with a label set that already exists. Returns ------- labels The input labels as a tuple """ labels_tuple = tuple(labels) if len(labels_tuple) != len(self._labelnames): raise ValueError("labels must have the same length as labelnames") if labels_tuple not in self._label_sets: self._label_sets[labels_tuple] = self._LabelSet(labels_tuple, self._counter_map.keys()) return labels_tuple
[docs] def update(self) -> None: """Update the internal totals from the streams. This is done automatically by :meth:`collect`, but it can also be called explicitly. This may be useful to do just before a stream goes out of scope, to ensure that counter updates since the last scrape are not lost when the stream is garbage collected. """ for label_set in self._label_sets.values(): label_set.update()
[docs] def add_stream(self, stream: spead2.recv.ChunkRingStream, labels: Iterable[str] = ()) -> None: """Register a new stream. If the collector was constructed with a non-empty ``labelnames``, then ``labels`` must contain the same number of elements to provide the labels for the metrics that this stream will update. .. warning:: Calling this more than once with the same stream will cause that stream's statistics to be counted multiple times. """ labels_tuple = self.add_label_set(labels) self._label_sets[labels_tuple].add_stream(stream)
[docs] def add_stream_group(self, stream_group: spead2.recv.ChunkStreamRingGroup, labels: Iterable[str] = ()) -> None: """Register all the streams in a stream group. If the collector was constructed with a non-empty ``labelnames``, then ``labels`` must contain the same number of elements to provide the labels for the metrics that this stream will update. .. warning:: Calling this more than once with the same stream group will cause that group's statistics to be counted multiple times. """ labels_tuple = self.add_label_set(labels) self._label_sets[labels_tuple].add_stream_group(stream_group)
[docs] def collect(self) -> Iterable[Metric]: """Implement Prometheus' Collector interface.""" self.update() for stat_name, (counter_name, counter_help) in self._counter_map.items(): metric = CounterMetricFamily(counter_name, counter_help, labels=self._labelnames) for labels, label_set in self._label_sets.items(): metric.add_metric(labels, label_set.totals[stat_name], created=label_set.created) yield metric
[docs] class BaseLayout(ABC): """Abstract base class for chunk layouts to derive from.""" @property @abstractmethod def heap_bytes(self) -> int: """Number of payload bytes per heap.""" ... @property @abstractmethod def chunk_batches(self) -> int: """Number of batches per chunk.""" ... @property @abstractmethod def batch_heaps(self) -> int: """Number of heaps per chunk, on the axes other than time.""" ... @property @abstractmethod def heap_sample_count(self) -> int: """Number of samples per heap. The meaning of samples is up to the concrete subclass. It is used only for statistics. """ ... @property @abstractmethod def chunk_timestamp_step(self) -> int: """Expected increase in timestamp from one chunk to the next.""" @property def chunk_heaps(self) -> int: """Number of heaps per chunk.""" return self.chunk_batches * self.batch_heaps @property def chunk_bytes(self) -> int: """Number of bytes per chunk.""" return self.heap_bytes * self.batch_heaps * self.chunk_batches @property @abstractmethod def _chunk_place(self) -> numba.core.ccallback.CFunc: ...
[docs] def chunk_place(self, user_data: np.ndarray) -> scipy.LowLevelCallable: """Generate low-level code for placing heaps in chunks. Parameters ---------- user_data Data to pass to the placement callback """ return scipy.LowLevelCallable( self._chunk_place.ctypes, user_data=user_data.ctypes.data_as(ctypes.c_void_p), signature="void (void *, size_t, void *)", )
[docs] def make_stream( *, layout: BaseLayout, spead_items: list[int], max_active_chunks: int, data_ringbuffer: spead2.recv.asyncio.ChunkRingbuffer, free_ringbuffer: spead2.recv.ChunkRingbuffer, affinity: int, stream_stats: list[str], user_data: np.ndarray, max_heap_extra: int = 0, **kwargs: Any, ) -> spead2.recv.ChunkRingStream: """Create a SPEAD receiver stream. Parameters ---------- layout Heap size and chunking parameters. spead_items List of SPEAD item IDs to be expected in the heap headers. max_active_chunks Maximum number of chunks under construction. data_ringbuffer Output ringbuffer to which chunks will be sent. free_ringbuffer Ringbuffer for holding chunks for recycling once they've been used. affinity CPU core affinity for the worker thread (negative to not set an affinity). stream_stats Stats to hook up to prometheus. user_data Data to pass to the chunk placement callback. It must have a record type with a `stats_base` element, which will be populated with the index of the first custom statistic. max_heap_extra Maximum non-payload data written by the place callback kwargs Other keyword arguments are passed to :class:`spead2.recv.StreamConfig`. """ stream_config = spead2.recv.StreamConfig(memcpy=spead2.MEMCPY_NONTEMPORAL, **kwargs) user_data["stats_base"] = stream_config.next_stat_index() for stat in stream_stats: stream_config.add_stat(stat) chunk_stream_config = spead2.recv.ChunkStreamConfig( items=spead_items, max_chunks=max_active_chunks, max_heap_extra=max_heap_extra, place=layout.chunk_place(user_data), ) return spead2.recv.ChunkRingStream( spead2.ThreadPool(1, [] if affinity < 0 else [affinity]), stream_config, chunk_stream_config, data_ringbuffer, free_ringbuffer, )
[docs] def make_stream_group( *, layout: BaseLayout, spead_items: list[int], max_active_chunks: int, data_ringbuffer: spead2.recv.asyncio.ChunkRingbuffer, free_ringbuffer: spead2.recv.ChunkRingbuffer, affinity: Sequence[int], stream_stats: list[str], user_data: np.ndarray, max_heap_extra: int = 0, **kwargs: Any, ) -> spead2.recv.ChunkStreamRingGroup: """Create a group of SPEAD receiver streams. Parameters ---------- layout Heap size and chunking parameters. spead_items List of SPEAD item IDs to be expected in the heap headers. max_active_chunks Maximum number of chunks under construction. data_ringbuffer Output ringbuffer to which chunks will be sent. free_ringbuffer Ringbuffer for holding chunks for recycling once they've been used. affinity CPU core affinities for the worker threads (negative to not set an affinity). The length of this list determines the number of streams to create. stream_stats Stats to hook up to prometheus. user_data User data to pass to the chunk callback. It must have a field called `stats_base`, which will be filled in appropriately (modifying the argument). The length must be the same as the length of `affinity` (i.e., one element per stream). max_heap_extra Maximum non-payload data written by the place callback kwargs Other keyword arguments are passed to :class:`spead2.recv.StreamConfig`. """ stream_config = spead2.recv.StreamConfig(memcpy=spead2.MEMCPY_NONTEMPORAL, **kwargs) user_data["stats_base"] = stream_config.next_stat_index() for stat in stream_stats: stream_config.add_stat(stat) max_chunks = max_active_chunks # If there is more than one stream in the group, allow the group to have # one extra active chunk to reduce inter-thread communication. if len(affinity) > 1: max_chunks += 1 group_config = spead2.recv.ChunkStreamGroupConfig(max_chunks=max_chunks, eviction_mode=EVICTION_MODE) group = spead2.recv.ChunkStreamRingGroup(group_config, data_ringbuffer, free_ringbuffer) for core, stream_user_data in zip(affinity, user_data, strict=True): thread_pool = spead2.ThreadPool(1, [] if core < 0 else [core]) chunk_stream_config = spead2.recv.ChunkStreamConfig( items=spead_items, max_chunks=max_active_chunks, max_heap_extra=max_heap_extra, place=layout.chunk_place(np.asarray(stream_user_data)), ) group.emplace_back(thread_pool, stream_config, chunk_stream_config) return group
[docs] def add_reader( stream: AnyStream, *, src: str | list[tuple[str, int]], interface: str | None, ibv: bool, comp_vector: int, buffer_size: int, ) -> None: """Connect a stream to an underlying transport. See the documentation for :class:`~.FEngine` for an explanation of the parameters. """ if isinstance(src, str): stream.add_udp_pcap_file_reader(src) elif ibv: if interface is None: raise ValueError("--recv-interface is required with --recv-ibv") ibv_config = spead2.recv.UdpIbvConfig( endpoints=src, interface_address=interface, buffer_size=buffer_size, comp_vector=comp_vector, ) stream.add_udp_ibv_reader(ibv_config) else: buffer_size = buffer_size // len(src) # split it across the endpoints for endpoint in src: stream.add_udp_reader( endpoint[0], endpoint[1], buffer_size=buffer_size, interface_address=interface or "", )
[docs] def make_sensors(sensor_timeout: float, prefixes: Sequence[str] = ("",)) -> aiokatcp.SensorSet: """Create the sensors needed to hold receiver statistics. Parameters ---------- sensor_timeout Time (in seconds) without updates before sensors for received data go into error and sensors for missing data become nominal. prefixes Prefixes to prepend to sensor names (e.g., for separate polarisations), including the trailing dot if non-empty. """ sensors = aiokatcp.SensorSet() for prefix in prefixes: timestamp_sensors: list[aiokatcp.Sensor] = [ make_rate_limited_sensor( int, f"{prefix}rx.timestamp", "The timestamp (in samples) of the last chunk of data received", default=-1, initial_status=aiokatcp.Sensor.Status.ERROR, ), make_rate_limited_sensor( aiokatcp.core.Timestamp, f"{prefix}rx.unixtime", "The timestamp (in UNIX time) of the last chunk of data received", default=aiokatcp.core.Timestamp(-1.0), initial_status=aiokatcp.Sensor.Status.ERROR, ), ] for sensor in timestamp_sensors: TimeoutSensorStatusObserver(sensor, sensor_timeout, aiokatcp.Sensor.Status.ERROR) sensors.add(sensor) missing_sensors: list[aiokatcp.Sensor] = [ make_rate_limited_sensor( aiokatcp.core.Timestamp, f"{prefix}rx.missing-unixtime", "The timestamp (in UNIX time) when missing data was last detected", default=aiokatcp.core.Timestamp(-1.0), initial_status=aiokatcp.Sensor.Status.NOMINAL, ) ] for sensor in missing_sensors: TimeoutSensorStatusObserver(sensor, sensor_timeout, aiokatcp.Sensor.Status.NOMINAL) sensors.add(sensor) sensors.add(DeviceStatusSensor(sensors, "rx.device-status", "Engine is receiving a good, clean data stream")) return sensors
[docs] @dataclass class Counters: """Prometheus counters for received data. Each module should define a single global instance. """ heaps: Counter chunks: Counter samples: Counter bytes: Counter missing_heaps: Counter clipped_samples: Counter | None = None #: Counters that should be instanced by :meth:`labels` PER_POL_COUNTERS: ClassVar = ["heaps", "samples", "bytes", "missing_heaps", "clipped_samples"]
[docs] def labels(self, *labelvalues, **labelkwargs) -> Self: """Apply labels to counters listed in PER_POL_COUNTERS.""" replace = {} for name in self.PER_POL_COUNTERS: counter = getattr(self, name) if isinstance(counter, Counter): replace[name] = counter.labels(*labelvalues, **labelkwargs) return dataclasses.replace(self, **replace)
[docs] async def iter_chunks( ringbuffer: spead2.recv.asyncio.ChunkRingbuffer, layout: BaseLayout, sensors: aiokatcp.SensorSet, time_converter: TimeConverter, pols: Sequence[tuple[str, str]] | None, counters: Counters, stats_collector: StatsCollector, ) -> AsyncGenerator[Chunk, None]: """Iterate over the chunks and update sensors. It also populates the chunk timestamp. If `counters` has a :attr:`~Counters.clipped_samples` counter, the chunks must have an `extra` field containing saturation counts, which will be used to populate that counter. Parameters ---------- ringbuffer Source of chunks. layout Structure of the streams. sensors Sensor set containing at least the sensors created by :func:`make_sensors`. time_converter Converter to turn data timestamps into sensor timestamps. pols List of polarisation labels, or ``None`` if the sensors are not separated by polarisation. If polarisations are separated, this must be the first axis in :attr:`Chunk.data` and :attr:`Chunk.present`. Each element is tuple of the Prometheus label and the katcp sensor prefix. counters Prometheus counters stats_collector Statistics collector. This must already be associated with the stream or stream group, but this function will ensure it is updated at the end of the stream. """ lost = 0 first_timestamp = -1 # Updated to the actual first timestamp on the first chunk # These duplicate the Prometheus counters, because prometheus_client # doesn't provide an efficient way to get the current value # (REGISTRY.get_sample_value is documented as being intended only for unit # tests). n_pols = len(pols) if pols is not None else 1 n_heaps = [0] * n_pols n_missing_heaps = [0] * n_pols # `try`/`finally` block acting as a quick-and-dirty context manager, # to ensure that we clean up nicely after ourselves if we are stopped. try: async for chunk in ringbuffer: assert isinstance(chunk, Chunk) # Inspect the chunk we have just received. chunk.timestamp = chunk.chunk_id * layout.chunk_timestamp_step # Cast to int isn't strictly necessary, but keeps all the accounting # in Python types instead of a hodge-podge of native and numpy types. good = int(np.sum(chunk.present)) if not good: # Dummy chunk created by spead2 chunk.recycle() continue if first_timestamp == -1: # TODO: use chunk.present to determine the actual first timestamp first_timestamp = chunk.timestamp lost += chunk.present.size - good logger.debug( "Received chunk: timestamp=%#x (%d/%d, lost %d)", chunk.timestamp, good, chunk.present.size, lost, ) unix_time = time_converter.adc_to_unix(chunk.timestamp) unix_time_katcp = aiokatcp.core.Timestamp(unix_time) expected_chunks = (chunk.timestamp - first_timestamp) // layout.chunk_timestamp_step + 1 pol_expected_heaps = expected_chunks * layout.chunk_batches * layout.batch_heaps // n_pols counters.chunks.inc() # Zero out saturation count for heaps that were never received # (otherwise the value is undefined memory). if chunk.extra is not None: chunk.extra[chunk.present == 0] = 0 for pol in range(n_pols): # The cast is to force numpy ints to Python ints. if pols is not None: pol_index: tuple[int, ...] = (pol,) pol_counters = counters.labels(pols[pol][0]) pol_prefix = pols[pol][1] + "." else: pol_index = () pol_counters = counters pol_prefix = "" buf_good = int(np.sum(chunk.present[pol_index])) pol_counters.heaps.inc(buf_good) pol_counters.samples.inc(buf_good * layout.heap_sample_count) pol_counters.bytes.inc(buf_good * layout.heap_bytes) if pol_counters.clipped_samples is not None and chunk.extra is not None: pol_counters.clipped_samples.inc(int(np.sum(chunk.extra[pol_index], dtype=np.uint64))) # Determine how many heaps we expected to have seen by # now, and subtract from it the number actually seen to # determine the number missing. This accounts for both # heaps lost within chunks and lost chunks. n_heaps[pol] += buf_good new_missing = pol_expected_heaps - n_heaps[pol] if new_missing > n_missing_heaps[pol]: pol_counters.missing_heaps.inc(new_missing - n_missing_heaps[pol]) n_missing_heaps[pol] = new_missing sensors[f"{pol_prefix}rx.missing-unixtime"].set_value( unix_time_katcp, timestamp=unix_time, status=aiokatcp.Sensor.Status.ERROR ) for pol in range(n_pols): pol_prefix = pols[pol][1] + "." if pols is not None else "" # Note: these must be set AFTER rx.missing-unixtime so that if # the first chunk received is missing data, we don't have an # intermediate state in which all the sensors are NOMINAL # (which would cause rx.device-status to be NOMINAL). sensors[f"{pol_prefix}rx.timestamp"].set_value(chunk.timestamp, timestamp=unix_time) sensors[f"{pol_prefix}rx.unixtime"].set_value(unix_time_katcp, timestamp=unix_time) yield chunk finally: stats_collector.update() # Ensure final stats updates are captured