Source code for katgpucbf.vgpu.main

################################################################################
# Copyright (c) 2025-2026 National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""vgpu main script."""

import argparse
import asyncio
import contextlib
import functools
import re
from collections.abc import MutableMapping, Sequence

import aiokatcp
import katcbf_vlbi_resample.polarisation

from .. import DEFAULT_JONES_PER_BATCH
from ..main import (
    add_common_arguments,
    add_recv_arguments,
    add_send_arguments,
    add_time_converter_arguments,
    comma_split,
    engine_main,
    parse_source_ipv4,
)
from ..monitor import FileMonitor, Monitor, NullMonitor
from .engine import CaptureConfig, RecvConfig, SendConfig, VEngine

VTP_DEFAULT_PORT = 52030
_ARGUMENT_PARSER = argparse.ArgumentParser  # Modified by unit tests


[docs] def parse_args(arglist: Sequence[str] | None = None) -> argparse.Namespace: """Parse the command-line arguments.""" parser = _ARGUMENT_PARSER(prog="vgpu") add_common_arguments(parser) add_recv_arguments(parser) add_send_arguments(parser, ibverbs=False, multi=True, affinity=False) add_time_converter_arguments(parser) parser.add_argument("--recv-channels", type=int, metavar="CHANNELS", required=True, help="Number of input channels") parser.add_argument( "--recv-channels-per-substream", type=int, metavar="CHANNELS", required=True, help="Number of input channels in heap", ) parser.add_argument( "--recv-jones-per-batch", type=int, default=DEFAULT_JONES_PER_BATCH, help="Number of tied-array-channelised-voltage Jones vectors in each batch. [%(default)s]", ) parser.add_argument( "--recv-samples-between-spectra", type=int, metavar="SAMPLES", required=True, help="Timestamp increment between spectra", ) parser.add_argument( "--recv-batches-per-chunk", type=int, metavar="BATCHES", default=8, help="Number of batches per input chunk" ) parser.add_argument( "--recv-sample-bits", type=int, metavar="BITS", default=8, choices=[8], help="Number of bits in each real sample [%(default)s]", ) parser.add_argument( "--recv-pols", type=comma_split(str, 2), metavar="[+-]P,[+-]P", required=True, help="Input polarisations (±x, ±y, ±L or ±R)", ) parser.add_argument("--send-bandwidth", type=float, metavar="HZ", required=True, help="Output bandwidth") parser.add_argument( "--send-pols", type=comma_split(str, 2), metavar="P,P", required=True, help="Output polarisations (x, y, L or R)", ) parser.add_argument( "--send-samples-per-frame", type=int, metavar="SAMPLES", default=20000, help="Samples per VDIF frame [%(default)s]", ) parser.add_argument("--send-station", type=str, metavar="ID", required=True, help="VDIF Station ID") parser.add_argument("--fir-taps", type=int, required=True, metavar="TAPS", help="Number of taps in rational filter") parser.add_argument( "--hilbert-taps", type=int, default=201, metavar="TAPS", help="Number of taps in Hilbert filter [%(default)s]" ) parser.add_argument( "--passband", type=float, default=0.9, metavar="FRACTION", help="Fraction of band to retain in passband filter [%(default)s]", ) parser.add_argument( "--threshold", type=float, default=0.969, metavar="SIGMA", help="Threshold (in σ) between quantisation levels [%(default)s]", ) parser.add_argument( "--power-int-time", type=int, default=1, metavar="SECONDS", help="Time (in seconds) over which to normalise power [%(default)s]", ) parser.add_argument("--monitor-log", help="File to write performance-monitoring data to") parser.add_argument("src", type=parse_source_ipv4, nargs=2, help="Source endpoints") parser.add_argument( "dst", type=functools.partial(parse_source_ipv4, default_port=VTP_DEFAULT_PORT), help="Destination endpoints" ) args = parser.parse_args(arglist) if args.recv_channels % args.recv_channels_per_substream != 0: parser.error( f"--recv-channels ({args.recv_channels}) " f"must be a multiple of --recv-channels-per-substream ({args.recv_channels_per_substream})" ) if args.recv_jones_per_batch % args.recv_channels != 0: parser.error( f"--recv-jones-per-batch ({args.recv_jones_per_batch}) " f"must be a multiple of --recv-channels ({args.recv_channels})" ) for pol in args.recv_pols: if not re.fullmatch(r"^[-+]?[xyLR]", pol): parser.error(f"{pol!r} is not a valid --recv-pols value") if set(pol[-1] for pol in args.recv_pols) not in [{"x", "y"}, {"L", "R"}]: parser.error(f"argument: --recv-pols: polarisations {','.join(args.recv_pols)} do not form an orthogonal basis") for pol in args.send_pols: if pol not in ["x", "y", "L", "R"]: parser.error(f"{pol!r} is not a valid --send-pols value") try: # Return value is discarded; called just for error checking katcbf_vlbi_resample.polarisation.to_linear(args.send_pols) except ValueError as exc: parser.error(f"argument --send-pols: {exc}") if args.power_int_time != 1: # TODO implement this: it will probably need changes in katcbf-vlbi-resample parser.error("--power-int-time is not yet implemented for non-default values") return args
[docs] def make_engine(args: argparse.Namespace) -> VEngine: """Create the :class:`.VEngine`.""" monitor: Monitor if args.monitor_log is not None: monitor = FileMonitor(args.monitor_log) else: monitor = NullMonitor() recv_config = RecvConfig( sync_time=args.sync_time, adc_sample_rate=args.adc_sample_rate, n_channels=args.recv_channels, n_channels_per_substream=args.recv_channels_per_substream, n_spectra_per_heap=args.recv_jones_per_batch // args.recv_channels, n_samples_between_spectra=args.recv_samples_between_spectra, n_batches_per_chunk=args.recv_batches_per_chunk, sample_bits=args.recv_sample_bits, srcs=args.src, interface=args.recv_interface, ibv=args.recv_ibv, affinity=args.recv_affinity, comp_vector=args.recv_comp_vector, buffer_size=args.recv_buffer, pols=tuple(args.recv_pols), ) send_config = SendConfig( pols=tuple(args.send_pols), bandwidth=args.send_bandwidth, n_samples_per_frame=args.send_samples_per_frame, station=args.send_station, rate_factor=args.send_rate_factor, dsts=args.dst, interfaces=args.send_interface, buffer_size=args.send_buffer, ttl=args.send_ttl, ) config = CaptureConfig( recv_config=recv_config, send_config=send_config, fir_taps=args.fir_taps, hilbert_taps=args.hilbert_taps, passband=args.passband, threshold=args.threshold, power_int_time=args.power_int_time, ) return VEngine( katcp_host=args.katcp_host, katcp_port=args.katcp_port, config=config, monitor=monitor, )
[docs] async def start_engine( args: argparse.Namespace, tg: asyncio.TaskGroup, exit_stack: contextlib.AsyncExitStack, locals_: MutableMapping[str, object], ) -> aiokatcp.DeviceServer: """Start the V-engine asynchronously. See Also -------- katgpucbf.main.engine_main """ engine = make_engine(args) locals_.update(locals()) await engine.start() return engine
[docs] def main(): """Run the V-engine.""" engine_main(parse_args(), start_engine)
if __name__ == "__main__": main()