Source code for katgpucbf.fgpu.recv

################################################################################
# Copyright (c) 2020-2025, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Recv module."""

import functools
import logging
from collections.abc import AsyncGenerator, Sequence
from dataclasses import dataclass
from enum import IntEnum

import aiokatcp
import numba
import numpy as np
import spead2.recv.asyncio
from numba import types
from prometheus_client import Counter
from spead2.numba import intp_to_voidptr
from spead2.recv.numba import chunk_place_data

from .. import BYTE_BITS, N_POLS
from .. import recv as base_recv
from ..recv import BaseLayout, Chunk, StatsCollector
from ..spead import DIGITISER_ID_ID, DIGITISER_STATUS_ID, DIGITISER_STATUS_SATURATION_COUNT_SHIFT, TIMESTAMP_ID
from ..utils import DeviceStatusSensor, TimeConverter, TimeoutSensorStatusObserver, make_rate_limited_sensor
from . import METRIC_NAMESPACE

#: Number of partial chunks to allow at a time. Using 1 would reject any out-of-order
#: heaps (which can happen with a multi-path network). 2 is sufficient provided heaps
#: are not delayed by a whole chunk.
MAX_CHUNKS = 2

logger = logging.getLogger(__name__)

heaps_counter = Counter("input_heaps", "number of heaps received", ["pol"], namespace=METRIC_NAMESPACE)
chunks_counter = Counter("input_chunks", "number of chunks received", namespace=METRIC_NAMESPACE)
samples_counter = Counter("input_samples", "number of digitiser samples received", ["pol"], namespace=METRIC_NAMESPACE)
bytes_counter = Counter(
    "input_bytes", "number of bytes of digitiser samples received", ["pol"], namespace=METRIC_NAMESPACE
)
missing_heaps_counter = Counter(
    "input_missing_heaps", "number of heaps dropped on the input", ["pol"], namespace=METRIC_NAMESPACE
)
dig_clip_counter = Counter(
    "input_clipped_samples", "number of ADC samples that clipped", ["pol"], namespace=METRIC_NAMESPACE
)
_PER_POL_COUNTERS = [
    heaps_counter,
    samples_counter,
    bytes_counter,
    missing_heaps_counter,
    dig_clip_counter,
]

stats_collector = StatsCollector(
    {
        "too_old_heaps": ("input_too_old_heaps", "number of heaps that arrived too late to be processed"),
        "katgpucbf.metadata_heaps": ("input_metadata_heaps", "number of heaps not containing payload"),
        "katgpucbf.bad_timestamp_heaps": (
            "input_bad_timestamp_heaps",
            "timestamp not a multiple of samples per packet",
        ),
    },
    namespace=METRIC_NAMESPACE,
)

user_data_type = types.Record.make_c_struct(
    [
        ("stats_base", types.size_t),  # Index for first custom statistic
        ("stride", types.size_t),  # Bytes between polarisations in payload array
    ]
)


class _Statistic(IntEnum):
    """Custom statistics for the SPEAD receiver."""

    # Note: the values are important and must match the registration order
    # of the statistics.
    METADATA_HEAPS = 0
    BAD_TIMESTAMP_HEAPS = 1


[docs] @dataclass(frozen=True) class Layout(BaseLayout): """Parameters controlling the sizes of heaps and chunks.""" sample_bits: int heap_samples: int chunk_samples: int mask_timestamp: bool @property def heap_bytes(self) -> int: """Number of payload bytes per heap.""" return self.heap_samples * self.sample_bits // BYTE_BITS @property def chunk_heaps(self) -> int: """Number of heaps per chunk, on time axis.""" return self.chunk_samples // self.heap_samples @property def timestamp_mask(self) -> np.uint64: """Mask to AND with incoming timestamps.""" return ~np.uint64(self.heap_samples - 1 if self.mask_timestamp else 0) @functools.cached_property def _chunk_place(self) -> numba.core.ccallback.CFunc: """Low-level code for placing heaps in chunks.""" heap_samples = self.heap_samples heap_bytes = self.heap_bytes chunk_heaps = self.chunk_heaps chunk_samples = self.chunk_samples timestamp_mask = self.timestamp_mask n_statistics = len(_Statistic) # numba.types doesn't have a size_t, so assume it is the same as uintptr_t @numba.cfunc( types.void(types.CPointer(chunk_place_data), types.uintp, types.CPointer(user_data_type)), nopython=True, ) def chunk_place_impl(data_ptr, data_size, user_data_ptr): data = numba.carray(data_ptr, 1) items = numba.carray(intp_to_voidptr(data[0].items), 3, dtype=np.int64) timestamp = items[0] payload_size = items[1] status = items[2] pol = items[3] & 1 # Polarisation is LSB of digitiser ID field user_data = numba.carray(user_data_ptr, 1) batch_stats = numba.carray( intp_to_voidptr(data[0].batch_stats), user_data[0].stats_base + n_statistics, dtype=np.uint64, ) if payload_size != heap_bytes or timestamp < 0: # It's something unexpected - maybe it has descriptors or a stream # control item. Ignore it. batch_stats[user_data[0].stats_base + _Statistic.METADATA_HEAPS] += 1 return timestamp &= timestamp_mask if timestamp % heap_samples != 0: batch_stats[user_data[0].stats_base + _Statistic.BAD_TIMESTAMP_HEAPS] += 1 return data[0].chunk_id = timestamp // chunk_samples heap_index = timestamp // heap_samples % chunk_heaps data[0].heap_index = heap_index + pol * chunk_heaps data[0].heap_offset = heap_index * heap_bytes + pol * user_data.stride extra = numba.carray(intp_to_voidptr(data[0].extra), 1, dtype=np.uint16) data[0].extra_offset = data[0].heap_index * 2 # 2 is sizeof(uint16) data[0].extra_size = 2 extra[0] = status >> DIGITISER_STATUS_SATURATION_COUNT_SHIFT return chunk_place_impl
def make_stream_group( layout: Layout, data_ringbuffer: spead2.recv.asyncio.ChunkRingbuffer, free_ringbuffer: spead2.recv.ChunkRingbuffer, recv_affinity: Sequence[int], stride: int, ) -> spead2.recv.ChunkStreamRingGroup: """Create SPEAD receiver streams. Small helper function with F-engine-specific logic in it. Returns a stream for each polarisation. Parameters ---------- layout Heap size and chunking parameters. data_ringbuffer Output ringbuffer to which chunks will be sent. free_ringbuffer Ringbuffer for holding chunks for recycling once they've been used. recv_affinity CPU core affinity for the worker threads (one per thread). Use -1 to indicate no affinity for a thread. stride Bytes between polarisations in chunk payload array """ # Reference counters to make the labels exist before the first scrape for pol in range(N_POLS): for counter in _PER_POL_COUNTERS: counter.labels(pol) user_data = np.zeros(1, dtype=user_data_type.dtype) user_data["stride"] = stride group = base_recv.make_stream_group( layout=layout, spead_items=[TIMESTAMP_ID, spead2.HEAP_LENGTH_ID, DIGITISER_STATUS_ID, DIGITISER_ID_ID], max_active_chunks=MAX_CHUNKS, max_heap_extra=np.dtype(np.uint16).itemsize, data_ringbuffer=data_ringbuffer, free_ringbuffer=free_ringbuffer, affinity=recv_affinity, max_heaps=1, # Digitiser heaps are single-packet, so no need for more stream_stats=["katgpucbf.metadata_heaps", "katgpucbf.bad_timestamp_heaps"], user_data=user_data, explicit_start=True, ) stats_collector.add_stream_group(group) return group def make_sensors(sensor_timeout: float) -> aiokatcp.SensorSet: """Create the sensors needed to hold receiver statistics. Parameters ---------- sensor_timeout Time (in seconds) without updates before sensors for received data go into error and sensors for missing data become nominal. """ sensors = aiokatcp.SensorSet() for pol in range(N_POLS): timestamp_sensors: list[aiokatcp.Sensor] = [ make_rate_limited_sensor( int, f"input{pol}.rx.timestamp", "The timestamp (in samples) of the last chunk of data received from the digitiser", default=-1, initial_status=aiokatcp.Sensor.Status.ERROR, ), make_rate_limited_sensor( aiokatcp.core.Timestamp, f"input{pol}.rx.unixtime", "The timestamp (in UNIX time) of the last chunk of data received from the digitiser", default=aiokatcp.core.Timestamp(-1.0), initial_status=aiokatcp.Sensor.Status.ERROR, ), ] for sensor in timestamp_sensors: TimeoutSensorStatusObserver(sensor, sensor_timeout, aiokatcp.Sensor.Status.ERROR) sensors.add(sensor) missing_sensors: list[aiokatcp.Sensor] = [ make_rate_limited_sensor( aiokatcp.core.Timestamp, f"input{pol}.rx.missing-unixtime", "The timestamp (in UNIX time) when missing data was last detected", default=aiokatcp.core.Timestamp(-1.0), initial_status=aiokatcp.Sensor.Status.NOMINAL, ) ] for sensor in missing_sensors: TimeoutSensorStatusObserver(sensor, sensor_timeout, aiokatcp.Sensor.Status.NOMINAL) sensors.add(sensor) sensors.add(DeviceStatusSensor(sensors, "rx.device-status", "F-engine is receiving a good, clean digitiser stream")) return sensors
[docs] async def iter_chunks( ringbuffer: spead2.recv.asyncio.ChunkRingbuffer, layout: Layout, sensors: aiokatcp.SensorSet, time_converter: TimeConverter, ) -> AsyncGenerator[Chunk, None]: """Iterate over the chunks and update sensors. It also populates the chunk timestamp. Parameters ---------- ringbuffer Source of chunks. layout Structure of the streams. sensors Sensor set containing at least the sensors created by :func:`make_sensors`. time_converter Converter to turn data timestamps into sensor timestamps. """ lost = 0 first_timestamp = -1 # Updated to the actual first timestamp on the first chunk # These duplicate the Prometheus counters, because prometheus_client # doesn't provide an efficient way to get the current value # (REGISTRY.get_sample_value is documented as being intended only for unit # tests). n_heaps = [0] * N_POLS n_missing_heaps = [0] * N_POLS # `try`/`finally` block acting as a quick-and-dirty context manager, # to ensure that we clean up nicely after ourselves if we are stopped. try: async for chunk in ringbuffer: assert isinstance(chunk, Chunk) # Inspect the chunk we have just received. chunk.timestamp = chunk.chunk_id * layout.chunk_samples good = np.sum(chunk.present) if not good: # Dummy chunk created by spead2 chunk.recycle() continue if first_timestamp == -1: # TODO: use chunk.present to determine the actual first timestamp first_timestamp = chunk.timestamp lost += chunk.present.size - good logger.debug( "Received chunk: timestamp=%#x (%d/%d, lost %d)", chunk.timestamp, good, chunk.present.size, lost, ) unix_time = time_converter.adc_to_unix(chunk.timestamp) unix_time_katcp = aiokatcp.core.Timestamp(unix_time) pol_expected_heaps = (chunk.timestamp - first_timestamp + layout.chunk_samples) // layout.heap_samples chunks_counter.inc() # Zero out saturation count for heaps that were never received # (otherwise the value is undefined memory). assert chunk.extra is not None chunk.extra[chunk.present == 0] = 0 for pol in range(N_POLS): # The cast is to force numpy ints to Python ints. buf_good = int(np.sum(chunk.present[pol])) heaps_counter.labels(pol).inc(buf_good) samples_counter.labels(pol).inc(buf_good * layout.heap_samples) bytes_counter.labels(pol).inc(buf_good * layout.heap_bytes) dig_clip_counter.labels(pol).inc(int(np.sum(chunk.extra[pol], dtype=np.uint64))) # Determine how many heaps we expected to have seen by # now, and subtract from it the number actually seen to # determine the number missing. This accounts for both # heaps lost within chunks and lost chunks. n_heaps[pol] += buf_good new_missing = pol_expected_heaps - n_heaps[pol] if new_missing > n_missing_heaps[pol]: missing_heaps_counter.labels(pol).inc(new_missing - n_missing_heaps[pol]) n_missing_heaps[pol] = new_missing sensors[f"input{pol}.rx.missing-unixtime"].set_value( unix_time_katcp, timestamp=unix_time, status=aiokatcp.Sensor.Status.ERROR ) for pol in range(N_POLS): # Note: these must be set AFTER rx.missing-unixtime so that if # the first chunk received is missing data, we don't have an # intermediate state in which all the sensors are NOMINAL # (which would cause rx.device-status to be NOMINAL). sensors[f"input{pol}.rx.timestamp"].set_value(chunk.timestamp, timestamp=unix_time) sensors[f"input{pol}.rx.unixtime"].set_value(unix_time_katcp, timestamp=unix_time) yield chunk finally: stats_collector.update() # Ensure final stats updates are captured
__all__ = ["Chunk", "Layout", "iter_chunks"]