################################################################################
# Copyright (c) 2022-2024, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""Shared utilities for sending data over SPEAD."""
import asyncio
import logging
from collections.abc import Iterable
import spead2.send.asyncio
logger = logging.getLogger(__name__)
[docs]
class DescriptorSender:
"""Manage sending descriptors at regular intervals.
The descriptors are first sent once immediately, then after `first_interval`
seconds, then every `interval` seconds. Using a different `first_interval`
makes it possible to stagger different senders so that their descriptors
do not all arrive at a common receiver at the same time.
The descriptors are sent with zero rate, which means they will not affect
the timing of other packets in the same stream.
Parameters
----------
stream
The stream to which the descriptor will be sent. It will be sent to all
substreams simultaneously.
descriptors
The descriptor heap to send.
interval
Interval (in seconds) between sending descriptors.
first_interval
Delay (in seconds) immediately after starting. If not specified, it
defaults to `interval`.
substreams
Substream indices to which descriptors are sent. If not specified,
send only to the first substream.
"""
def __init__(
self,
stream: "spead2.send.asyncio.AsyncStream",
descriptors: spead2.send.Heap,
interval: float,
first_interval: float | None = None,
*,
substreams: Iterable[int] = (0,),
) -> None:
self._stream = stream
self._heap_reference_list = spead2.send.HeapReferenceList(
[spead2.send.HeapReference(descriptors, substream_index=i, rate=0.0) for i in substreams]
)
self._interval = interval
self._first_interval = interval if first_interval is None else first_interval
self._halt_event = asyncio.Event()
async def _send_descriptors(self) -> None:
logger.debug("Sending descriptors")
await self._stream.async_send_heaps(heaps=self._heap_reference_list, mode=spead2.send.GroupMode.ROUND_ROBIN)
[docs]
def halt(self) -> None:
"""Request :meth:`run` to stop, but do not wait for it."""
self._halt_event.set()
[docs]
async def run(self) -> None:
"""Send the descriptors indefinitely (use :meth:`halt` or cancel to stop)."""
t = self._first_interval
loop = asyncio.get_running_loop()
deadline = loop.time()
while not self._halt_event.is_set():
await self._send_descriptors()
# Compute absolute time to send the next one (this ensure that there is
# no systematic drift).
deadline += t
# Turn into a relative time. Ensure we always sleep for a small
# interval, even if we fell behind.
delay = max(t * 0.01, deadline - loop.time())
try:
# wait_for will time out if _halt_event is not set by the deadline.
await asyncio.wait_for(self._halt_event.wait(), timeout=delay)
except TimeoutError:
pass
t = self._interval
[docs]
def send_rate(
packet_header: int,
packet_payload: int,
heap_payload: int,
heap_interval: float,
send_rate_factor: float,
) -> float:
"""Compute the send rate (in bytes per second) to pass to spead2.
Parameters
----------
packet_header
Overhead bytes in each SPEAD packet, including the SPEAD header (but
excluding UDP/IP etc headers)
packet_payload
Number of payload bytes that should be included in each packet
heap_payload
Number of payload bytes in each heap
heap_interval
Time (in seconds) between sending heaps (or 0 for as fast as possible)
send_rate_factor
Safety factor by which the transmission rate should exceed the
incoming data rate
"""
if heap_interval == 0.0:
return 0.0
packets_per_heap = (heap_payload + packet_payload - 1) // packet_payload
heap_overhead = packets_per_heap * packet_header
return (heap_payload + heap_overhead) / heap_interval * send_rate_factor