################################################################################
# Copyright (c) 2024-2025, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
Simulate channelised data from the MeerKAT F-Engines destined for one or more XB-Engines.
Refer to :ref:`feng-packet-sim` for more information.
"""
import argparse
import asyncio
import functools
import itertools
import os
import time
from collections.abc import Sequence
import katsdpservices
import numpy as np
import prometheus_async
import spead2.send.asyncio
from katsdptelstate.endpoint import endpoint_list_parser
from numba import njit
from prometheus_client import Counter, Gauge
from .. import (
BYTE_BITS,
COMPLEX,
DEFAULT_JONES_PER_BATCH,
DEFAULT_PACKET_PAYLOAD_BYTES,
N_POLS,
SPEAD_DESCRIPTOR_INTERVAL_S,
spead,
)
from ..fgpu.send import PREAMBLE_SIZE, make_descriptor_heap, make_item_group
from ..main import add_common_arguments, add_gc_stats, add_send_arguments, add_time_converter_arguments
from ..send import DescriptorSender
from ..utils import TimeConverter
from . import METRIC_NAMESPACE
DTYPE = np.dtype(np.int8)
#: Number of heaps in time to keep in flight
QUEUE_DEPTH = 8
output_heaps_counter = Counter("output_heaps", "number of heaps transmitted", namespace=METRIC_NAMESPACE)
output_bytes_counter = Counter("output_bytes", "number of payload bytes transmitted", namespace=METRIC_NAMESPACE)
time_error_gauge = Gauge(
"time_error_s", "elapsed time minus expected elapsed time", ["stream"], namespace=METRIC_NAMESPACE
)
[docs]
def parse_args(arglist: Sequence[str] | None = None) -> argparse.Namespace:
"""Parse the command-line arguments."""
parser = argparse.ArgumentParser(prog="fsim")
add_time_converter_arguments(parser, sync_time_required=False)
parser.add_argument(
"--array-size", type=int, default=80, help="Number of antennas in the simulated array [%(default)s]"
)
parser.add_argument(
"--channels", type=int, default=32768, help="Total number of channels in the simulated stream [%(default)s]"
)
parser.add_argument(
"--channels-per-substream", type=int, default=512, help="Number of channels sent by this fsim [%(default)s]"
)
parser.add_argument(
"--samples-between-spectra", type=int, help="Number of digitiser samples between spectra [2*channels]"
)
parser.add_argument(
"--jones-per-batch",
type=int,
default=DEFAULT_JONES_PER_BATCH,
help="Number of antenna-channelised-voltage Jones vectors in each F-engine batch [%(default)s]",
)
add_send_arguments(parser, prefix="")
parser.add_argument(
"--send-packet-payload",
type=int,
default=DEFAULT_PACKET_PAYLOAD_BYTES,
metavar="BYTES",
help="Size for output packets (voltage payload only) [%(default)s]",
)
parser.add_argument(
"--main-affinity", type=int, default=-1, help="Core affinity for the main Python thread [not bound]"
)
add_common_arguments(parser, katcp=False, aiomonitor=False)
parser.add_argument("--run-once", action="store_true", help="Transmit a single collection of heaps before exiting")
parser.add_argument(
"dest",
type=endpoint_list_parser(spead.DEFAULT_PORT),
metavar="X.X.X.X[+N]:PORT",
help="Destination addresses (one per xb-engine)",
)
args = parser.parse_args(arglist)
if args.samples_between_spectra is None:
args.samples_between_spectra = 2 * args.channels
if args.jones_per_batch % args.channels != 0:
parser.error("--jones-per-batch must be divisible by --channels")
return args
[docs]
@njit
def make_heap_payload(
out: np.ndarray,
heap_index: int,
feng_id: int,
n_ants: int,
) -> None:
"""Create the simulated payload data for a heap.
A pattern is chosen that will hopefully be easy to verify at the receiver
graphically. On each F-Engine, the signal amplitude will increase linearly
over time for each channel. Each channel will have a different starting
amplitude but the rate of increase will be the same for all channels.
Each F-Engine will have the same same signal amplitude for the same
timestamp, but the signal phase will be different. The signal phase remains
constant across all channels in a single F-Engine. By examining the signal
phase it can be verified that correct feng_id is attached to the correct
data.
These samples need to be stored as 8 bit samples. As such, the amplitude is
wrapped each time it reaches 127. 127 is used as the amplitude when
multiplied by the phase can reach -127. The full range of values is
covered.
This current format is not fixed and it is likely that it will be adjusted
to be suited for different verification needs.
Parameters
----------
out
Output array, with shape (n_channels_per_substream, n_spectra_per_heap, N_POLS, COMPLEX)
heap_index
Heap index on time axis
feng_id
Heap index on antenna axis
n_ants
Number of antennas in the array
"""
n_channels_per_substream = out.shape[0]
n_spectra_per_heap = out.shape[1]
initial_offset = heap_index * n_spectra_per_heap
sample_angle = 2.0 * np.pi / (n_ants * N_POLS) * (feng_id * N_POLS + np.arange(2))
for c in range(n_channels_per_substream):
for t in range(n_spectra_per_heap):
sample_amplitude = (initial_offset + c * 10 + t) % 127
for p in range(N_POLS):
out[c][t][p][0] = sample_amplitude * np.cos(sample_angle[p])
out[c][t][p][1] = sample_amplitude * np.sin(sample_angle[p])
[docs]
def make_heap(
timestamp: np.ndarray, feng_id: int, channel_offset: int, payload: np.ndarray
) -> spead2.send.HeapReference:
"""Create a heap to transmit.
The `timestamp` must be a zero-dimensional array of dtype ``>u8``. It will
be transmitted by reference, so it can be updated in place to change the
stored timestamp.
"""
# The heap setup should be equivalent to katgpucbf.fgpu.send.Batch
item_group = make_item_group(shape=payload.shape, dtype=payload.dtype)
item_group[spead.TIMESTAMP_ID].value = timestamp
item_group[spead.FENG_ID_ID].value = feng_id
item_group[spead.FREQUENCY_ID].value = channel_offset
item_group[spead.FENG_RAW_ID].value = payload
heap = item_group.get_heap(descriptors="none", data="all")
heap.repeat_pointers = True
return spead2.send.HeapReference(heap)
[docs]
def make_stream(args: argparse.Namespace, idx: int, data: np.ndarray) -> "spead2.send.asyncio.AsyncStream":
"""Create a SPEAD stream for a single destination.
Parameters
----------
args
Command-line arguments
idx
Index into the destinations to use
data
All payload data for this destination
"""
overhead = 1 + PREAMBLE_SIZE / args.send_packet_payload
# Data rate for the entire array, excluding packet overhead
bandwidth = args.adc_sample_rate * args.channels / args.samples_between_spectra
full_rate = bandwidth * N_POLS * COMPLEX * DTYPE.itemsize * args.array_size
rate = full_rate * args.channels_per_substream / args.channels * overhead
print(f"Rate for {args.dest[idx]}: {rate * 8e-9:.3f} Gbps", flush=True)
config = spead2.send.StreamConfig(
max_packet_size=args.send_packet_payload + PREAMBLE_SIZE,
rate=rate,
max_heaps=QUEUE_DEPTH * args.array_size + 1, # + 1 for descriptor heaps
)
thread_pool = spead2.ThreadPool(1, [] if args.affinity < 0 else [args.affinity])
endpoints = [(args.dest[idx].host, args.dest[idx].port)]
if args.ibv:
udp_ibv_config = spead2.send.UdpIbvConfig(
endpoints=endpoints,
interface_address=args.interface,
ttl=args.ttl,
memory_regions=[data],
)
return spead2.send.asyncio.UdpIbvStream(thread_pool, config, udp_ibv_config)
else:
return spead2.send.asyncio.UdpStream(thread_pool, endpoints, config, args.ttl, args.interface)
[docs]
class Sender:
"""Manage sending data to a single XB-engine."""
def __init__(self, args: argparse.Namespace, idx: int) -> None:
n_spectra_per_heap = args.jones_per_batch // args.channels
self.data = np.empty(
(
QUEUE_DEPTH,
args.array_size,
args.channels_per_substream,
n_spectra_per_heap,
N_POLS,
COMPLEX,
),
DTYPE,
)
self.timestamp_step = args.samples_between_spectra * n_spectra_per_heap
self.timestamps = np.empty(QUEUE_DEPTH, spead.IMMEDIATE_DTYPE)
self.batches: list[spead2.send.HeapReferenceList] = []
for i in range(QUEUE_DEPTH):
batch_heaps = []
for j in range(args.array_size):
payload = self.data[i, j]
make_heap_payload(payload, i, j, args.array_size)
# The ... makes numpy return a 0d array instead of a scalar
batch_heaps.append(
make_heap(
self.timestamps[i, ...], j, channel_offset=idx * args.channels_per_substream, payload=payload
)
)
self.batches.append(spead2.send.HeapReferenceList(batch_heaps))
self.stream = make_stream(args, idx, self.data)
self.time_error_gauge = time_error_gauge.labels(str(idx))
self.batch_heaps = args.array_size
self.batch_bytes = self.data[0].nbytes
# Actual sync time will be filled in by run().
self.time_converter = TimeConverter(0.0, args.adc_sample_rate)
self.descriptor_heap = make_descriptor_heap(
channels_per_substream=args.channels_per_substream,
spectra_per_heap=n_spectra_per_heap,
sample_bits=DTYPE.itemsize * BYTE_BITS,
)
def _update_metrics(self, end_timestamp: int, future: asyncio.Future) -> None:
end_time = self.time_converter.adc_to_unix(end_timestamp)
self.time_error_gauge.set(time.time() - end_time)
output_heaps_counter.inc(self.batch_heaps)
output_bytes_counter.inc(self.batch_bytes)
[docs]
async def run(self, sync_time: float, run_once: bool) -> None:
"""Send heaps until cancelled."""
self.time_converter.sync_time = sync_time
futures: list[asyncio.Future[int]] = [asyncio.get_running_loop().create_future() for _ in range(QUEUE_DEPTH)]
for i in range(QUEUE_DEPTH):
futures[i].set_result(0) # Make the future ready
timestamp = 0
for idx in itertools.cycle(range(QUEUE_DEPTH)):
await futures[idx] # Wait for previous transmission of this batch
self.timestamps[idx] = timestamp
futures[idx] = self.stream.async_send_heaps(self.batches[idx], spead2.send.GroupMode.ROUND_ROBIN)
timestamp += self.timestamp_step
if run_once:
break
futures[idx].add_done_callback(functools.partial(self._update_metrics, timestamp))
await asyncio.gather(*futures)
async def _async_main(tg: asyncio.TaskGroup) -> None:
"""Real implementation of :func:`async_main`.
This is split into a separate function to avoid having to indent the whole
thing inside the `tg` context manager.
"""
args = parse_args()
add_gc_stats()
if args.prometheus_port is not None:
await prometheus_async.aio.web.start_http_server(port=args.prometheus_port)
senders = [Sender(args, i) for i in range(len(args.dest))]
descriptor_senders = [
DescriptorSender(sender.stream, sender.descriptor_heap, SPEAD_DESCRIPTOR_INTERVAL_S) for sender in senders
]
for descriptor_sender in descriptor_senders:
tg.create_task(descriptor_sender.run())
if args.main_affinity >= 0:
os.sched_setaffinity(0, [args.main_affinity])
if args.sync_time is not None:
sync_time = args.sync_time
else:
sync_time = time.time()
async with asyncio.TaskGroup() as sender_tg:
for sender in senders:
sender_tg.create_task(sender.run(sync_time, args.run_once))
for descriptor_sender in descriptor_senders:
descriptor_sender.halt()
[docs]
async def async_main() -> None:
"""Run main program."""
async with asyncio.TaskGroup() as tg:
await _async_main(tg)
[docs]
def main() -> None:
"""Run main program."""
katsdpservices.setup_logging()
asyncio.run(async_main())
if __name__ == "__main__":
main()