Source code for katgpucbf.vgpu.recv

################################################################################
# Copyright (c) 2025, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Handle receiving tied-array-channelised-voltage data."""

import functools
from collections.abc import AsyncGenerator, Sequence
from dataclasses import dataclass
from enum import IntEnum

import aiokatcp
import numba
import numpy as np
import spead2.recv.asyncio
from numba import types
from prometheus_client import Counter
from spead2.numba import intp_to_voidptr
from spead2.recv.numba import chunk_place_data

from .. import BYTE_BITS, COMPLEX, N_POLS
from .. import recv as base_recv
from ..recv import BaseLayout, Chunk, Counters, StatsCollector
from ..spead import BEAM_ANTS_ID, FREQUENCY_ID, TIMESTAMP_ID
from ..utils import TimeConverter
from . import METRIC_NAMESPACE

#: Number of chunks to allow to be under construction
MAX_CHUNKS = 2  # TODO: may need to increase to tolerate reordering

counters = Counters(
    heaps=Counter("input_heaps", "number of heaps received", ["pol"], namespace=METRIC_NAMESPACE),
    chunks=Counter("input_chunks", "number of chunks received", namespace=METRIC_NAMESPACE),
    samples=Counter("input_samples", "number of complex voltage samples received", ["pol"], namespace=METRIC_NAMESPACE),
    bytes=Counter("input_bytes", "number of bytes of input data received", ["pol"], namespace=METRIC_NAMESPACE),
    missing_heaps=Counter(
        "input_missing_heaps", "number of heaps dropped on the input", ["pol"], namespace=METRIC_NAMESPACE
    ),
)

stats_collector = StatsCollector(
    {
        "incomplete_heaps_evicted": ("input_incomplete_heaps", "number of heaps only partially received"),
        "too_old_heaps": ("input_too_old_heaps", "number of heaps that arrived too late to be processed"),
        "katgpucbf.metadata_heaps": ("input_metadata_heaps", "number of heaps not containing payload"),
        "katgpucbf.bad_timestamp_heaps": (
            "input_bad_timestamp_heaps",
            "timestamp not a multiple of samples per packet",
        ),
        "katgpucbf.bad_frequency_heaps": (
            "input_bad_frequency_heaps",
            "channel is not a multiple of channels per substream or is out of range",
        ),
    },
    namespace=METRIC_NAMESPACE,
)


user_data_type = types.Record.make_c_struct(
    [
        ("stats_base", types.size_t),  # Index for first custom statistic
        ("pol", types.size_t),  # Which beam this is
    ]
)


class _Statistic(IntEnum):
    """Custom statistics for the SPEAD receiver."""

    # Note: the values are important and must match the registration order
    # of the statistics.
    METADATA_HEAPS = 0
    BAD_TIMESTAMP_HEAPS = 1
    BAD_FREQUENCY_HEAPS = 2


[docs] @dataclass(frozen=True) class Layout(BaseLayout): """Parameters controlling the sizes of heaps and chunks. Parameters ---------- sample_bits Bits per sample (for each of real and imaginary). n_channels Total number of channels. n_channels_per_substream The number of frequency channels in each beam substream. n_spectra_per_heap The number of samples on the time axis in each heap. n_batches_per_chunk Number of heaps per chunk on the time axis. heap_timestamp_step Increase in timestamp between successive heaps. Timestamps must also be a multiple of this value. """ sample_bits: int n_channels: int n_channels_per_substream: int n_spectra_per_heap: int n_batches_per_chunk: int heap_timestamp_step: int def __post_init__(self) -> None: # Could probably be relaxed in future, but would need to be investigated. if self.sample_bits % BYTE_BITS != 0: raise ValueError(f"sample_bits must be a multiple of {BYTE_BITS}") if self.n_channels % self.n_channels_per_substream != 0: raise ValueError( f"n_channels ({self.n_channels}) is not a multiple of " f"n_channels_per_substream ({self.n_channels_per_substream})" ) @property def heap_bytes(self) -> int: # noqa: D102 return self.n_channels_per_substream * self.n_spectra_per_heap * self.sample_bits * COMPLEX // BYTE_BITS @property def n_pol_substreams(self) -> int: """Number of substreams in each polarisation.""" return self.n_channels // self.n_channels_per_substream @property def chunk_batches(self) -> int: # noqa: D102 return self.n_batches_per_chunk @property def batch_heaps(self) -> int: # noqa: D102 return self.n_pol_substreams * N_POLS @property def chunk_timestamp_step(self) -> int: # noqa: D102 return self.heap_timestamp_step * self.n_batches_per_chunk @property def heap_sample_count(self) -> int: # noqa: D102 return self.n_spectra_per_heap * self.n_channels_per_substream @functools.cached_property def _chunk_place(self) -> numba.core.ccallback.CFunc: heap_bytes = self.heap_bytes heap_timestamp_step = self.heap_timestamp_step n_channels = self.n_channels n_channels_per_substream = self.n_channels_per_substream n_batches_per_chunk = self.n_batches_per_chunk n_pol_substreams = self.n_pol_substreams n_heaps_per_pol = n_batches_per_chunk * n_pol_substreams n_statistics = len(_Statistic) @numba.cfunc( types.void(types.CPointer(chunk_place_data), types.uintp, types.CPointer(user_data_type)), nopython=True ) def chunk_place_impl(data_ptr, data_size, user_data_ptr): data = numba.carray(data_ptr, 1) user_data = numba.carray(user_data_ptr, 1) batch_stats = numba.carray( intp_to_voidptr(data[0].batch_stats), user_data[0].stats_base + n_statistics, dtype=np.uint64, ) items = numba.carray(intp_to_voidptr(data[0].items), 4, dtype=np.int64) timestamp = items[0] frequency = items[1] beam_ants = items[2] # TODO should this be used to flag bad data? payload_size = items[3] pol = user_data[0].pol if payload_size != heap_bytes or timestamp < 0 or frequency < 0 or beam_ants < 0: # Probably a metadata heap - ignore it batch_stats[user_data[0].stats_base + _Statistic.METADATA_HEAPS] += 1 return if timestamp % heap_timestamp_step != 0: # Invalid timestamp batch_stats[user_data[0].stats_base + _Statistic.BAD_TIMESTAMP_HEAPS] += 1 return if frequency % n_channels_per_substream != 0 or frequency >= n_channels: # Invalid frequency batch_stats[user_data[0].stats_base + _Statistic.BAD_FREQUENCY_HEAPS] += 1 return # Heap index on the time axis, from timestamp 0 heap_time_abs = timestamp // heap_timestamp_step data[0].chunk_id = heap_time_abs // n_batches_per_chunk # Position of this heap on the time axis, from the start of the chunk heap_time = heap_time_abs % n_batches_per_chunk # Position of this heap on the frequency axis heap_freq = frequency // n_channels_per_substream data[0].heap_index = pol * n_heaps_per_pol + heap_time * n_pol_substreams + heap_freq data[0].heap_offset = data[0].heap_index * heap_bytes return chunk_place_impl
[docs] def make_stream_group( layout: Layout, data_ringbuffer: spead2.recv.asyncio.ChunkRingbuffer, free_ringbuffer: spead2.recv.ChunkRingbuffer, recv_affinity: int, pol_labels: Sequence[str], ) -> spead2.recv.ChunkStreamRingGroup: """Create a stream group for receiving dual-polarised beam data. The readers are not added to the streams. Parameters ---------- layout Heap size and chunking parameters. data_ringbuffer Output ringbuffer to which chunks will be sent. free_ringbuffer Ringbuffer for holding chunks for recycling once they've been used. recv_affinity CPU core affinity for the worker thread. Use -1 to indicate no affinity. pol_labels Prometheus labels to apply to the polarisations (must have length 2). """ # Reference counters to make the labels exist before the first scrape assert len(pol_labels) == N_POLS for pol in pol_labels: counters.labels(pol) user_data = np.zeros(N_POLS, dtype=user_data_type.dtype) user_data["pol"] = np.arange(N_POLS) group = base_recv.make_stream_group( layout=layout, spead_items=[TIMESTAMP_ID, FREQUENCY_ID, BEAM_ANTS_ID, spead2.HEAP_LENGTH_ID], max_active_chunks=MAX_CHUNKS, data_ringbuffer=data_ringbuffer, free_ringbuffer=free_ringbuffer, affinity=[recv_affinity] * N_POLS, stream_stats=["katgpucbf.metadata_heaps", "katgpucbf.bad_timestamp_heaps", "katgpucbf.bad_frequency_heaps"], user_data=user_data, substreams=layout.n_pol_substreams, explicit_start=True, ) stats_collector.add_stream_group(group) return group
[docs] def iter_chunks( ringbuffer: spead2.recv.asyncio.ChunkRingbuffer, layout: Layout, sensors: aiokatcp.SensorSet, time_converter: TimeConverter, pol_labels: Sequence[str], ) -> AsyncGenerator[Chunk, None]: """Iterate over the chunks and update sensors. It also populates the chunk timestamp. Parameters ---------- ringbuffer Source of chunks. layout Structure of the streams. sensors Sensor set containing at least the sensors created by :func:`.make_sensors`. time_converter Converter to turn data timestamps into sensor timestamps. pol_labels Input polarisation labels (must match those passed to :func:`make_stream_group`). """ return base_recv.iter_chunks( ringbuffer, layout, sensors, time_converter, [(pol, pol) for pol in pol_labels], counters, stats_collector, )