Source code for katgpucbf.xbgpu.recv

################################################################################
# Copyright (c) 2020-2025, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""SPEAD receiver utilities."""

import functools
import logging
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from enum import IntEnum

import numba
import numpy as np
import spead2.recv.asyncio
import spead2.send.asyncio
from aiokatcp import SensorSet
from numba import types
from prometheus_client import Counter
from spead2.numba import intp_to_voidptr
from spead2.recv.numba import chunk_place_data

from .. import BYTE_BITS, COMPLEX, N_POLS
from .. import recv as base_recv
from ..recv import BaseLayout, Chunk, Counters, StatsCollector, user_data_type
from ..spead import FENG_ID_ID, TIMESTAMP_ID
from ..utils import TimeConverter
from . import METRIC_NAMESPACE

logger = logging.getLogger(__name__)

counters = Counters(
    heaps=Counter("input_heaps", "number of heaps received", namespace=METRIC_NAMESPACE),
    chunks=Counter("input_chunks", "number of chunks received", namespace=METRIC_NAMESPACE),
    bytes=Counter("input_bytes", "number of bytes of input data received", namespace=METRIC_NAMESPACE),
    missing_heaps=Counter("input_missing_heaps", "number of heaps dropped on the input", namespace=METRIC_NAMESPACE),
    samples=Counter("input_samples", "number of complex samples received", namespace=METRIC_NAMESPACE),
)

stats_collector = StatsCollector(
    {
        "incomplete_heaps_evicted": ("input_incomplete_heaps", "number of heaps only partially received"),
        "too_old_heaps": ("input_too_old_heaps", "number of heaps that arrived too late to be processed"),
        "katgpucbf.metadata_heaps": ("input_metadata_heaps", "number of heaps not containing payload"),
        "katgpucbf.bad_timestamp_heaps": (
            "input_bad_timestamp_heaps",
            "timestamp is not a multiple of expected step size",
        ),
        "katgpucbf.bad_feng_id_heaps": ("input_bad_feng_id_heaps", "fengine ID is out of range"),
    },
    namespace=METRIC_NAMESPACE,
)


class _Statistic(IntEnum):
    """Custom statistics for the SPEAD receiver."""

    # NOTE: the values are important and must match the registration order
    # of the statistics.
    METADATA_HEAPS = 0
    BAD_TIMESTAMP_HEAPS = 1
    BAD_FENG_ID_HEAPS = 2


[docs] @dataclass(frozen=True) class Layout(BaseLayout): """Parameters controlling the sizes of heaps and chunks. Parameters ---------- n_ants The number of antennas that data will be received from n_channels_per_substream The number of frequency channels contained in the stream. n_spectra_per_heap The number of time samples received per frequency channel. heap_timestamp_step Each heap contains a timestamp. The timestamp between consecutive heaps changes depending on the FFT size and the number of time samples per channel. This parameter defines the difference in timestamp values between consecutive heaps. This parameter can be calculated from the array configuration parameters for power-of-two array sizes, but is configurable to allow for greater flexibility during testing. sample_bits The number of bits per sample. Only 8 bits is supported at the moment. heaps_per_fengine_per_chunk Each chunk out of the SPEAD2 receiver will contain multiple heaps from each antenna. This parameter specifies the number of heaps per antenna that each chunk will contain. """ n_ants: int n_channels_per_substream: int n_spectra_per_heap: int heap_timestamp_step: int sample_bits: int heaps_per_fengine_per_chunk: int @property def heap_bytes(self): # noqa: D102 return ( self.n_channels_per_substream * self.n_spectra_per_heap * N_POLS * COMPLEX * self.sample_bits // BYTE_BITS ) @property def chunk_batches(self) -> int: # noqa: D102 return self.heaps_per_fengine_per_chunk @property def batch_heaps(self) -> int: # noqa: D102 return self.n_ants @property def heap_sample_count(self) -> int: # noqa: D102 return self.n_channels_per_substream * self.n_spectra_per_heap @property def chunk_timestamp_step(self) -> int: # noqa: D102 return self.heap_timestamp_step * self.heaps_per_fengine_per_chunk @functools.cached_property def _chunk_place(self) -> numba.core.ccallback.CFunc: n_ants = self.n_ants heap_timestamp_step = self.heap_timestamp_step heaps_per_fengine_per_chunk = self.heaps_per_fengine_per_chunk heap_bytes = self.heap_bytes n_statistics = len(_Statistic) @numba.cfunc( types.void(types.CPointer(chunk_place_data), types.uintp, types.CPointer(user_data_type)), nopython=True ) def chunk_place_impl(data_ptr, data_size, user_data_ptr): data = numba.carray(data_ptr, 1) user_data = numba.carray(user_data_ptr, 1) batch_stats = numba.carray( intp_to_voidptr(data[0].batch_stats), user_data[0].stats_base + n_statistics, dtype=np.uint64, ) items = numba.carray(intp_to_voidptr(data[0].items), 3, dtype=np.int64) timestamp = items[0] fengine_id = items[1] payload_size = items[2] if payload_size != heap_bytes or timestamp < 0 or fengine_id < 0: # It's something unexpected - possibly descriptors. Ignore it. batch_stats[user_data[0].stats_base + _Statistic.METADATA_HEAPS] += 1 return if timestamp % heap_timestamp_step != 0: # Invalid timestamp batch_stats[user_data[0].stats_base + _Statistic.BAD_TIMESTAMP_HEAPS] += 1 return if fengine_id >= n_ants: # Invalid F-engine ID batch_stats[user_data[0].stats_base + _Statistic.BAD_FENG_ID_HEAPS] += 1 return # Compute position of this heap on the time axis, starting from # timestamp 0 heap_time_abs = timestamp // heap_timestamp_step data[0].chunk_id = heap_time_abs // heaps_per_fengine_per_chunk # Position of this heap on the time axis, from the start of the chunk heap_time = heap_time_abs % heaps_per_fengine_per_chunk data[0].heap_index = heap_time * n_ants + fengine_id data[0].heap_offset = data[0].heap_index * heap_bytes return chunk_place_impl
[docs] def make_stream( layout: Layout, data_ringbuffer: spead2.recv.asyncio.ChunkRingbuffer, free_ringbuffer: spead2.recv.ChunkRingbuffer, recv_affinity: int, max_active_chunks: int, ) -> spead2.recv.ChunkRingStream: """Create a SPEAD receiver stream. Helper function with XB-engine-specific logic in it. Parameters ---------- layout Heap size and chunking parameters. data_ringbuffer Output ringbuffer to which chunks will be sent. free_ringbuffer Ringbuffer for holding chunks for recycling once they've been used. recv_affinity CPU core affinity for the worker thread. max_active_chunks Maximum number of chunks under construction. """ user_data = np.zeros(1, dtype=user_data_type.dtype) stream = base_recv.make_stream( layout=layout, spead_items=[TIMESTAMP_ID, FENG_ID_ID, spead2.HEAP_LENGTH_ID], max_active_chunks=max_active_chunks, data_ringbuffer=data_ringbuffer, free_ringbuffer=free_ringbuffer, affinity=recv_affinity, stream_stats=["katgpucbf.metadata_heaps", "katgpucbf.bad_timestamp_heaps", "katgpucbf.bad_feng_id_heaps"], substreams=layout.n_ants, stop_on_stop_item=False, # By default, a heap containing a stream control stop item will terminate the stream user_data=user_data, ) stats_collector.add_stream(stream) return stream
[docs] def iter_chunks( ringbuffer: spead2.recv.asyncio.ChunkRingbuffer, layout: Layout, sensors: SensorSet, time_converter: TimeConverter, ) -> AsyncGenerator[Chunk, None]: """Retrieve chunks from the ringbuffer, updating metrics as they are received. The returned chunks are yielded from this asynchronous generator. Parameters ---------- ringbuffer Source of chunks. layout Structure of the stream. sensors Sensor set containing at least the sensors created by :func:`.make_sensors`. time_converter Converter to turn data timestamps into sensor timestamps. """ return base_recv.iter_chunks( ringbuffer, layout, sensors, time_converter, None, counters, stats_collector, )