Source code for katgpucbf.main

################################################################################
# Copyright (c) 2020-2026, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Utilities for writing the main programs of engines."""

import argparse
import asyncio
import contextlib
import enum
import gc
import ipaddress
import logging
import os
import signal
import time
from collections.abc import Awaitable, Callable, MutableMapping
from dataclasses import dataclass
from typing import Any

import aiokatcp
import katsdpservices
import prometheus_async
import prometheus_client
from katsdpservices import get_interface_address
from katsdpservices.aiomonitor import add_aiomonitor_arguments, start_aiomonitor
from katsdptelstate.endpoint import endpoint_list_parser

from . import (
    DEFAULT_KATCP_HOST,
    DEFAULT_KATCP_PORT,
    DEFAULT_RECV_BUFFER_SIZE,
    DEFAULT_SEND_BUFFER_SIZE,
    DEFAULT_TTL,
    __version__,
    spead,
    utils,
)

logger = logging.getLogger(__name__)


[docs] def parse_enum[E: enum.Enum](name: str, value: str, cls: type[E]) -> E: """Parse a command-line argument into an enum type.""" table = {member.name.lower(): member for member in cls} try: return table[value] except KeyError: valid = ", ".join(repr(key) for key in table.keys()) raise argparse.ArgumentTypeError(f"invalid {name} value: {value!r} (valid values are {valid})") from None
[docs] def parse_dither(value: str) -> utils.DitherType: """Parse a string into a dither type.""" # Note: this allows only the non-aliases, so excludes DEFAULT return parse_enum("dither", value, utils.DitherType)
[docs] def parse_source_ipv4(value: str, default_port=spead.DEFAULT_PORT) -> list[tuple[str, int]]: """Parse a string into a list of IPv4 endpoints.""" endpoints = endpoint_list_parser(default_port)(value) for endpoint in endpoints: ipaddress.IPv4Address(endpoint.host) # Raises if invalid syntax return [(ep.host, ep.port) for ep in endpoints]
[docs] def parse_source(value: str) -> list[tuple[str, int]] | str: """Parse a string into a list of IP endpoints or a filename.""" try: return parse_source_ipv4(value) except ValueError: if os.path.exists(value): return value raise ValueError(f"{value} is not an endpoint list or a filename") from None
[docs] def comma_split[T]( base_type: Callable[[str], T], count: int | None = None, allow_single=False ) -> Callable[[str], list[T]]: """Return a function to split a comma-delimited str into a list of type T. This function is used to parse lists of parameters which come from the command-line as comma-separated strings, but are obviously more useful as a list. Parameters ---------- base_type The base type of thing you expect in the list, e.g. `int`, `float`. count How many of them you expect to be in the list. `None` means the list could be any length. allow_single If true (defaults to false), allow a single value to be used when `count` is greater than 1. In this case, it will be repeated `count` times. """ def func(value: str) -> list[T]: parts = value.split(",") if parts == [""]: parts = [] n = len(parts) if count is not None and n == 1 and allow_single: parts = parts * count elif count is not None and n != count: raise argparse.ArgumentTypeError(f"Expected {count} comma-separated fields, received {n}") # Mirror argparse's logic for wrapping TypeError and ValueError, so # that the error reflects the list element that is invalid. out = [] for part in parts: try: part_value = base_type(part) except (TypeError, ValueError) as exc: base_name = getattr(base_type, "__name__", repr(base_type)) raise argparse.ArgumentTypeError(f"invalid {base_name} value: {part!r}") from exc else: out.append(part_value) return out return func
[docs] def add_common_arguments( parser: argparse.ArgumentParser, *, katcp: bool = True, prometheus: bool = True, aiomonitor: bool = True, version: bool = True, ) -> None: """Add command-line arguments to the parser.""" if katcp: parser.add_argument( "--katcp-host", type=str, default=DEFAULT_KATCP_HOST, help="Hostname or IP address on which to listen for KATCP C&M connections [all interfaces]", ) parser.add_argument( "--katcp-port", type=int, default=DEFAULT_KATCP_PORT, help="TCP port on which to listen for KATCP C&M connections [%(default)s]", ) if prometheus: parser.add_argument( "--prometheus-port", type=int, help="Network port on which to serve Prometheus metrics [none]", ) if aiomonitor: add_aiomonitor_arguments(parser) if version: parser.add_argument("--version", action="version", version=__version__)
[docs] def add_time_converter_arguments(parser: argparse.ArgumentParser, sync_time_required: bool = True) -> None: """Add arguments necessary for constructing a TimeConverter.""" parser.add_argument( "--adc-sample-rate", type=float, required=True, metavar="HZ", help="Digitiser sampling rate", ) parser.add_argument( "--sync-time", type=float, required=sync_time_required, help="UNIX time at which digitisers were synced", )
def _multi_add_argument(multi: bool, parser: argparse.ArgumentParser, *args, **kwargs) -> None: """Add an argument to a parser in either singular to multi form. In the multi form, the user passes a comma-separated list, and the argument becomes an array. To make the help text appropriate to either case, the `metavar` and `help` arguments are processed with :meth:`str.format`, with the following substitutions: ====== ============= ============== Key multi=False multi=True ====== ============= ============== s "" "(s)" dots "" ",..." ====== ============= ============== If a default is specified, it will become a singleton array in the multi case. However, if no default is specified, the default remains ``None`` regardless of `multi`. """ if multi: subst = {"s": "(s)", "dots": ",..."} kwargs["type"] = comma_split(kwargs.get("type", str)) if "default" in kwargs: # Avoid showing the array in the help text. This is somewhat # hacky because it only handles one specific way of writing # the default. Unfortunately there isn't an easy way to do # %-formatting with only one token substituted. default = kwargs["default"] if "help" in kwargs: kwargs["help"] = kwargs["help"].replace("%(default)s", str(default)) kwargs["default"] = [default] else: subst = {"s": "", "dots": ""} for key in ["metavar", "help"]: if key in kwargs: kwargs[key] = kwargs[key].format(**subst) parser.add_argument(*args, **kwargs)
[docs] def add_recv_arguments(parser: argparse.ArgumentParser, *, multi: bool = False) -> None: """Add arguments for receiving interface (supporting ibverbs). If `multi` is true, the arguments take comma-separated lists. """ _multi_add_argument( multi, parser, "--recv-interface", type=get_interface_address, metavar="IFACE{dots}", help="Name{s} of input network device{s}", ) parser.add_argument("--recv-ibv", action="store_true", help="Use ibverbs for receiving [no]") _multi_add_argument( multi, parser, "--recv-affinity", type=int, metavar="CORE{dots}", default=-1, help="Core{s} for input-handling thread{s} [not bound]", ) _multi_add_argument( multi, parser, "--recv-comp-vector", type=int, metavar="VECTOR{dots}", default=0, help="Completion vector{s} for source streams, or -1 for polling [0]", ) parser.add_argument( "--recv-buffer", type=int, default=DEFAULT_RECV_BUFFER_SIZE, metavar="BYTES", help="Size of network receive buffer [128MiB]", )
[docs] def add_send_arguments( parser: argparse.ArgumentParser, *, prefix: str = "send-", multi: bool = False, ibverbs: bool = True, affinity: bool = True, rate_factor: bool = True, ) -> None: """Add arguments for sending interface (optionally supporting ibverbs). Parameters ---------- parser Parser to which arguments are added. prefix Prefix to use on argument names. multi If true, multiple interfaces are supported. ibverbs If true, ibverbs-related options will be added. affinity If true, a separate thread is used for sending, and the core affinity may be set. rate_factor If true, provide the ``--send-rate-factor`` command-line argument. """ if affinity: parser.add_argument( f"--{prefix}affinity", type=int, default=-1, metavar="CORE", help="Core for output-handling thread [not bound]", ) _multi_add_argument( multi, parser, f"--{prefix}interface", type=get_interface_address, required=True, metavar="IFACE{dots}", help="Name{s} of output network device{s}", ) parser.add_argument( f"--{prefix}ttl", type=int, default=DEFAULT_TTL, metavar="TTL", help="TTL for outgoing packets [%(default)s]" ) parser.add_argument( "--send-buffer", type=int, default=DEFAULT_SEND_BUFFER_SIZE, metavar="BYTES", help="Size of network send buffer [1MiB]", ) if ibverbs: parser.add_argument(f"--{prefix}ibv", action="store_true", help="Use ibverbs for output [no]") parser.add_argument( f"--{prefix}comp-vector", type=int, default=0, metavar="VECTOR", help="Completion vector for transmission, or -1 for polling [%(default)s]", ) if rate_factor: parser.add_argument( f"--{prefix}rate-factor", type=float, default=1.1, metavar="FACTOR", help="Target transmission rate faster than ADC sample rate by this factor. " "Set to zero to send as fast as possible. [%(default)s]", )
[docs] class SubParser: """Parser for a single command-line argument that has suboptions. The sub-options use key=value syntax, separated by commas. This is not as complete as a full argparse parser, but supports some basics: - required arguments - defaults - types It does not currently support actions (such as ``store_true``). Specifying a keyword more than once results in an error. It is callable, so that an instance can be passed directly as the `type` argument of :meth:`argparse.ArgumentParser.add_argument`. """ @dataclass class _Argument: type: Callable[[str], Any] required: bool default: Any def __init__(self) -> None: self._arguments: dict[str, SubParser._Argument] = {}
[docs] def add_argument( self, name: str, *, type: Callable[[str], Any] = str, required: bool = False, default: Any = None, ) -> None: """Add an argument.""" if name in self._arguments: raise ValueError(f"argument {name} already exists") self._arguments[name] = self._Argument( type, required, default, )
def __call__(self, value: str) -> argparse.Namespace: """Parse the argument.""" ans = argparse.Namespace() for part in value.split(","): match part.split("=", 1): case [key, data]: arg = self._arguments.get(key) if arg is None: raise argparse.ArgumentTypeError(f"unknown key {key}") elif key in ans: raise argparse.ArgumentTypeError(f"{key} already specified") try: setattr(ans, key, arg.type(data)) except (TypeError, ValueError) as exc: type_name = getattr(arg.type, "__name__", str(arg.type)) raise argparse.ArgumentTypeError(f"{key}: invalid {type_name} value: {data!r}") from exc except argparse.ArgumentTypeError as exc: raise argparse.ArgumentTypeError(f"{key}: {exc}") from exc case _: raise argparse.ArgumentTypeError(f"missing = in {part!r}") for key, arg in self._arguments.items(): if key not in ans: if arg.required: raise argparse.ArgumentTypeError(f"{key} is missing") else: setattr(ans, key, arg.default) return ans
[docs] def add_signal_handlers(server: aiokatcp.DeviceServer) -> None: """Arrange for clean shutdown on SIGINT (Ctrl-C) or SIGTERM.""" signums = [signal.SIGINT, signal.SIGTERM] def handler(): # Remove the handlers so that if it fails to shut down, the next # attempt will try harder. logger.info("Received signal, shutting down") for signum in signums: loop.remove_signal_handler(signum) server.halt() loop = asyncio.get_running_loop() for signum in signums: loop.add_signal_handler(signum, handler)
[docs] def add_gc_stats() -> None: """Add Prometheus metrics for garbage collection timing. It is only safe to call this once. """ gc_time = prometheus_client.Histogram( "python_gc_time_seconds", "Time spent in garbage collection", buckets=[0.0002, 0.0005, 0.001, 0.002, 0.005, 0.010, 0.020, 0.050, 0.100], labelnames=["generation"], ) # Make all the metrics exist, before any GC calls happen for generation in range(3): gc_time.labels(str(generation)) start_time = 0.0 def callback(phase: str, info: dict) -> None: nonlocal start_time if phase == "start": start_time = time.monotonic() else: started = start_time # Copy as early as possible, before any more GC can happen elapsed = time.monotonic() - started gc_time.labels(str(info["generation"])).observe(elapsed) gc.callbacks.append(callback)
async def _engine_main_async( args: argparse.Namespace, start_engine: Callable[ [argparse.Namespace, asyncio.TaskGroup, contextlib.AsyncExitStack, MutableMapping[str, object]], Awaitable[aiokatcp.DeviceServer], ], ) -> None: katsdpservices.setup_logging() add_gc_stats() locals_: dict[str, object] = {} # We enter the task group separately from the exit_stack, so that we # always wait for the task group to complete before performing any # of the exit_stack cleanup. async with contextlib.AsyncExitStack() as exit_stack, asyncio.TaskGroup() as tg: if getattr(args, "prometheus_port", None) is not None: prometheus_server = await prometheus_async.aio.web.start_http_server(port=args.prometheus_port) exit_stack.push_async_callback(prometheus_server.close) if getattr(args, "aiomonitor", False): exit_stack.enter_context(start_aiomonitor(asyncio.get_running_loop(), args, locals_)) engine = await start_engine(args, tg, exit_stack, locals_) add_signal_handlers(engine) # Avoid garbage collections needing to iterate over all the objects # allocated so far. That makes garbage collection much faster, and we # don't expect to free up much of what's currently allocated. gc.collect() # Free up any current garbage before it gets frozen gc.freeze() exit_stack.callback(gc.unfreeze) tg.create_task(engine.join())
[docs] def engine_main( args: argparse.Namespace, start_engine: Callable[ [argparse.Namespace, asyncio.TaskGroup, contextlib.AsyncExitStack, MutableMapping], Awaitable[aiokatcp.DeviceServer], ], ) -> None: """Run an engine. This takes care of: - running an event loop; - setting up logging; - running a web server for Prometheus scraping if requested on the command line; - running aiomonitor - adding Prometheus statistics for the garbage collector (GC); - freezing the GC after starting the engine and unfreezing it on shutdown; Parameters ---------- args The command-line arguments. start_engine The function that sets up and starts the engine. It takes the following parameters: - The command-line arguments (`args`) - A task group that can be used to schedule on-going work that will be waited on before shutting down. - An asynchronous exit stack that can be used to enter contexts or schedule cleanup work. - A dictionary of variables to expose to aiomonitor. """ asyncio.run(_engine_main_async(args, start_engine))