################################################################################
# Copyright (c) 2024-2025, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""Compute statistics by bucketing into windows."""
from dataclasses import dataclass
from typing import Final, Protocol, Self
# Based on typing.SupportsAbs etc.
class _SupportsAdd[T](Protocol):
def __add__(self, other: Self) -> T:
pass # pragma: nocover
[docs]
@dataclass(frozen=True)
class Measurement[T: _SupportsAdd]:
"""A measurement returned by :meth:`Accum.add`."""
start_timestamp: int
end_timestamp: int
#: Total of the value over the provided data.
total: T | None
def _add[T: _SupportsAdd](a: T | None, b: T | None) -> T | None:
"""Add two values, returning None if either of them is None."""
if a is not None and b is not None:
return a + b
else:
return None
[docs]
class Accum[T: _SupportsAdd]:
"""Accumulator for a single statistic.
The statistic is a linear measurement over intervals of time.
Measurements over small intervals are provided to this class, which
accumulates them. Time is divided into fixed "windows" (all the same
length), with a total produced for each window for which at least some data
is provided. If a window is missing some data, either because it was not
provided or it was explicitly indicated that it was missing, then the
window measurement will instead indicate that the sum is unknown.
Time is considered to be discrete (integer). Window intervals are
timestamps that are multiples of `window_size`. Data cannot be added for
intervals that cross window boundaries.
Parameters
----------
window_size
Factor that divides timestamp window boundaries
zero
Value to initialise the accumulator to
"""
def __init__(self, window_size: int, zero: T) -> None:
self._window_size: Final = window_size
self._zero: Final = zero
self._total: T | None = zero # Sum for current window, if valid
# Point up to which we have received calls to :meth:`add`
self._end_timestamp = 0
self._window_id = 0 # Start timestamp divided by the window size
def _flush(self, new_window_id: int) -> Measurement | None:
"""Generate a :class:`Measurement` (if non-empty) and reset state."""
# [base_timestamp, next_timestamp) is the full range of the current window
base_timestamp = self._window_id * self._window_size
next_timestamp = base_timestamp + self._window_size
ret: Measurement | None = None
# Don't send if the old window was completely empty
if self._end_timestamp > base_timestamp:
# If we didn't get the end of the window, then we don't know the total
total = self._total if self._end_timestamp == next_timestamp else None
ret = Measurement(
base_timestamp,
next_timestamp,
total,
)
# Reset the state
self._window_id = new_window_id
self._end_timestamp = new_window_id * self._window_size
self._total = self._zero
return ret
[docs]
def add(self, start_timestamp: int, end_timestamp: int, value: T | None) -> Measurement | None:
"""Add new data.
If the new data falls into a new window compared to the existing data
or it completes the current window, then an instance of
:class:`Measurement` is returned with the total for the previous
window. Note that if both occur, the result for the old window is
simply discarded.
Parameters
----------
start_timestamp, end_timestamp
The time range for the extension
value
The statistic measured over the given time range, or
None if there was missing data in the range.
Raises
------
ValueError
If `start_timestamp` > `end_timestamp`
ValueError
If the new data overlaps or preceeds previous data
ValueError
If [start_timestamp, end_timestamp) crosses a window boundary
"""
if start_timestamp > end_timestamp:
raise ValueError("start_timestamp ({start_timestamp}) > end_timestamp ({end_timestamp})")
new_window_id = start_timestamp // self._window_size
new_window_end = (new_window_id + 1) * self._window_size
if end_timestamp > new_window_end:
raise ValueError("new data crosses a window boundary")
if start_timestamp < self._end_timestamp:
raise ValueError("new data starts before end of previous data")
ret: Measurement | None = None
if new_window_id != self._window_id:
# New data falls into a new window - flush out the old one.
ret = self._flush(new_window_id)
if start_timestamp != self._end_timestamp:
# We skipped some data
value = None
self._total = _add(self._total, value)
self._end_timestamp = end_timestamp
if end_timestamp == new_window_end:
# New data completes a window - flush it now.
ret = self._flush(self._window_id + 1)
return ret