Source code for katgpucbf.xbgpu.main

################################################################################
# Copyright (c) 2020-2026, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""
Module to launch the XB-Engine.

This module parses all command line arguments required to configure the
XB-Engine and creates an XBEngine object. The XBEngine object then manages
everything required to run the XB-Engine.

.. todo::
    - Checks need to be put in place to ensure the command line parameters are
      correct:

        - Is the port number valid, is the IP address a multicast address, is
          the array size >0, etc.
"""

import argparse
import asyncio
import contextlib
import logging
import os
from collections.abc import MutableMapping, Sequence

import katsdpsigproc.accel
import vkgdr
from katsdpsigproc.abc import AbstractContext
from katsdptelstate.endpoint import endpoint_parser

from katgpucbf.xbgpu.engine import XBEngine

from .. import (
    DEFAULT_JONES_PER_BATCH,
    DEFAULT_PACKET_PAYLOAD_BYTES,
)
from ..main import (
    SubParser,
    add_common_arguments,
    add_recv_arguments,
    add_send_arguments,
    add_time_converter_arguments,
    engine_main,
    parse_dither,
    parse_source_ipv4,
)
from ..mapped_array import make_vkgdr
from ..monitor import FileMonitor, Monitor, NullMonitor
from ..spead import DEFAULT_PORT
from ..utils import DitherType
from . import DEFAULT_RECV_REORDER_TOL
from .correlation import device_filter
from .output import BOutput, XOutput

logger = logging.getLogger(__name__)


def _add_stream_args(parser: SubParser) -> None:
    parser.add_argument("name", type=str, required=True)
    parser.add_argument("dst", type=endpoint_parser(DEFAULT_PORT), required=True)


def _parse_pol(value: str) -> int:
    pol = int(value)
    if pol not in {0, 1}:
        raise argparse.ArgumentTypeError("pol must be either 0 or 1")
    return pol


[docs] def parse_beam(value: str) -> BOutput: """Parse a string with beam configuration data. The string has a comma-separated list of key=value pairs. See :class:`BOutput` for the valid keys and types. The following keys are required: - name - dst - pol """ parser = SubParser() _add_stream_args(parser) parser.add_argument("pol", type=_parse_pol, required=True) parser.add_argument("dither", type=parse_dither, default=DitherType.DEFAULT) args = parser(value) return BOutput(**vars(args))
[docs] def parse_corrprod(value: str) -> XOutput: """Parse a string with correlation product configuration data. The string has comma-separated list of key=value pairs. See :class:`XOutput` for the valid keys and types. The following keys are required: - name - dst - heap_accumulation_threshold """ parser = SubParser() _add_stream_args(parser) parser.add_argument("heap_accumulation_threshold", type=int, required=True) args = parser(value) return XOutput(**vars(args))
[docs] def parse_args(arglist: Sequence[str] | None = None) -> argparse.Namespace: """Parse all command line parameters for the XB-Engine and ensure that they are valid.""" parser = argparse.ArgumentParser(description="Launch an XB-Engine for a single multicast stream.") parser.add_argument( "--beam", type=parse_beam, default=[], action="append", metavar="KEY=VALUE[,KEY=VALUE...]", help="Add a half-beam output (may be repeated). The required keys are: name, dst, pol. Optional keys: dither.", ) parser.add_argument( "--corrprod", type=parse_corrprod, default=[], action="append", metavar="KEY=VALUE[,KEY=VALUE...]", help="Add a baseline-correlation-products output (may be repeated). The required keys are: " "name, dst, heap_accumulation_threshold.", ) add_common_arguments(parser) add_time_converter_arguments(parser) parser.add_argument( "--bandwidth", type=float, help="Total bandwidth (Hz) of all channels (--channels) [computed assuming critically-sampled PFB].", ) parser.add_argument("--array-size", type=int, required=True, help="Number of antennas in the array.") parser.add_argument( "--channels", type=int, required=True, help="Total number of channels in the stream.", ) parser.add_argument( "--channels-per-substream", type=int, required=True, help="Number of channels in the multicast stream that this engine receives data from.", ) parser.add_argument( "--samples-between-spectra", type=int, required=True, help="Number of samples between spectra.", ) parser.add_argument( "--channel-offset-value", type=int, default=0, help="Index of the first channel in the subset of channels processed " "by this XB-Engine. Used to set the value in the XB-Engine " "output heaps for spectrum reassembly by the downstream receiver. " "[%(default)s]", ) parser.add_argument( "--jones-per-batch", type=int, default=DEFAULT_JONES_PER_BATCH, help="Number of antenna-channelised-voltage Jones vectors in each F-engine batch. [%(default)s]", ) parser.add_argument( "--sample-bits", type=int, default=8, choices=[8], help="Number of bits for each real and imaginary value in a sample. [%(default)s]", ) parser.add_argument( "--heaps-per-fengine-per-chunk", type=int, default=32, help="A batch is a collection of heaps from different F-Engines with " "the same timestamp. This parameter specifies the number of " "consecutive batches to store in the same chunk. The higher this " "value is, the more GPU and system RAM is allocated, the lower " "this value is, the more work the python processing thread " "is required to do. [%(default)s]", ) parser.add_argument( "--recv-reorder-tol", type=int, default=DEFAULT_RECV_REORDER_TOL, help="Maximum time (in ADC ticks) that packets can be delayed relative to others " "and still be accepted. [%(default)s]", ) add_recv_arguments(parser, multi=False) add_send_arguments(parser, multi=False) parser.add_argument( "--send-packet-payload", type=int, default=DEFAULT_PACKET_PAYLOAD_BYTES, help="Size in bytes for output packets (payload only) [%(default)s]", ) parser.add_argument( "--send-enabled", action="store_true", help="Start with correlator output transmission enabled, without having to issue a katcp command.", ) parser.add_argument("--monitor-log", type=str, help="File to write performance-monitoring data to") parser.add_argument("src", type=parse_source_ipv4, help="Multicast address data is received from.") args = parser.parse_args(arglist) if args.jones_per_batch % args.channels != 0: parser.error(f"--jones-per-batch ({args.jones_per_batch}) must be a multiple of --channels ({args.channels})") if args.bandwidth is None: args.bandwidth = args.adc_sample_rate / args.samples_between_spectra * args.channels used_names = set() args.outputs = [] for output_group in [args.corrprod, args.beam]: for output in output_group: name = output.name if name in used_names: parser.error(f"output name {name} already used.") used_names.add(name) args.outputs.append(output) return args
[docs] def make_engine( context: AbstractContext, vkgdr_handle: vkgdr.Vkgdr, args: argparse.Namespace ) -> tuple[XBEngine, Monitor]: """Make an :class:`XBEngine` object, given a GPU context. Parameters ---------- context The GPU context in which the :class:`.XBEngine` will operate. vkgdr_handle Handle to vkgdr for the same device as `context`. args Parsed arguments returned from :func:`parse_args`. """ monitor: Monitor if args.monitor_log is not None: monitor = FileMonitor(filename=args.monitor_log) else: monitor = NullMonitor() logger.info("Initialising XB-Engine on %s", context.device.name) xbengine = XBEngine( katcp_host=args.katcp_host, katcp_port=args.katcp_port, adc_sample_rate=args.adc_sample_rate, bandwidth=args.bandwidth, send_rate_factor=args.send_rate_factor, n_ants=args.array_size, n_channels=args.channels, n_channels_per_substream=args.channels_per_substream, n_samples_between_spectra=args.samples_between_spectra, n_spectra_per_heap=args.jones_per_batch // args.channels, sample_bits=args.sample_bits, sync_time=args.sync_time, channel_offset_value=args.channel_offset_value, outputs=args.outputs, src=args.src, recv_interface=args.recv_interface, recv_ibv=args.recv_ibv, recv_affinity=args.recv_affinity, recv_comp_vector=args.recv_comp_vector, recv_buffer=args.recv_buffer, send_interface=args.send_interface, send_buffer=args.send_buffer, send_ttl=args.send_ttl, send_ibv=args.send_ibv, send_packet_payload=args.send_packet_payload, send_affinity=args.send_affinity, send_comp_vector=args.send_comp_vector, heaps_per_fengine_per_chunk=args.heaps_per_fengine_per_chunk, recv_reorder_tol=args.recv_reorder_tol, send_enabled=args.send_enabled, monitor=monitor, context=context, vkgdr_handle=vkgdr_handle, ) return xbengine, monitor
[docs] async def start_engine( args: argparse.Namespace, tg: asyncio.TaskGroup, exit_stack: contextlib.AsyncExitStack, locals_: MutableMapping[str, object], ) -> XBEngine: """Create and launch the XB-Engine. Attach the ibverbs sender transport to the XBEngine object and then tell the object to launch all its internal asyncio functions. See Also -------- katgpucbf.main.engine_main """ context = katsdpsigproc.accel.create_some_context(device_filter=device_filter) vkgdr_handle = make_vkgdr(context.device) xbengine, monitor = make_engine(context, vkgdr_handle, args) exit_stack.enter_context(monitor) # katsdpcontroller launches us with real-time scheduling, but we don't # want that for the main Python thread since it can starve the # latency-sensitive network threads. os.sched_setscheduler(0, os.SCHED_OTHER, os.sched_param(0)) logger.info("Starting main processing loop") locals_.update(locals()) await xbengine.start() return xbengine
[docs] def main() -> None: """Run the XB-engine.""" engine_main(parse_args(), start_engine)
if __name__ == "__main__": main()