katgpucbf.utils module

A collection of utility functions for katgpucbf.

class katgpucbf.utils.DeviceStatusSensor(target: SensorSet, name: str = 'device-status', description: str = 'Overall engine health')[source]

Bases: SimpleAggregateSensor[DeviceStatus]

Summary sensor for quickly ascertaining device status.

This takes its value from the worst status of its target set of sensors, so it’s quick to identify if there’s something wrong, or if everything is good.

aggregate_add(sensor: Sensor[_T], reading: Reading[_T]) bool[source]

Update internal state with an additional reading.

Returns:

True if the new reading should result in a state update.

Return type:

bool

aggregate_compute() tuple[Status, DeviceStatus][source]

Compute aggregate status and value from the internal state.

aggregate_remove(sensor: Sensor[_T], reading: Reading[_T]) bool[source]

Update internal state by removing a reading.

Returns:

True if removing the reading should result in a state update.

Return type:

bool

filter_aggregate(sensor: Sensor) bool[source]

Decide whether another sensor is part of the aggregation.

Users can override this function to exclude certain categories of sensors, such as other aggregates, to prevent circular references.

Returns:

True if sensor should be included in calculation of the aggregate, False if not.

Return type:

bool

update_aggregate(updated_sensor: Sensor[_T] | None, reading: Reading[_T] | None, old_reading: Reading[_T] | None) Reading[DeviceStatus] | None[source]

Update the aggregated sensor.

The user is required to override this function, which must return the updated Reading (i.e. value, status and timestamp) which will be reflected in the Reading of the aggregated sensor.

Parameters:
  • updated_sensor – The sensor in the target SensorSet which has changed in some way.

  • reading – The current reading of the updated_sensor. This is None if the sensor is being removed from the set.

  • old_reading – The previous reading of the updated_sensor. This is None if the sensor is being added to the set.

Returns:

The reading (value, status, timestamp) that should be shown by the AggregatedSensor as a result of the change. If None is returned, the sensor’s reading is not modified.

Return type:

Optional[Reading]

class katgpucbf.utils.TimeConverter(sync_time: float, adc_sample_rate: float)[source]

Bases: object

Convert times between UNIX timestamps and ADC sample counts.

Note that because UNIX timestamps are handled as 64-bit floats, they are only accurate to roughly microsecond precision, and will not round-trip precisely.

Parameters:
  • sync_time – UNIX timestamp corresponding to ADC timestamp 0

  • adc_sample_rate – Number of ADC samples per second

  • todo:: (..) – This does not yet handle leap-seconds correctly.

adc_to_unix(samples: float) float[source]

Convert an ADC sample count to a UNIX timstamp.

unix_to_adc(timestamp: float) float[source]

Convert a UNIX timestamp to an ADC sample count.

class katgpucbf.utils.TimeoutSensorStatusObserver(sensor: Sensor, timeout: float, new_status: Status)[source]

Bases: object

Change the status of a sensor if it doesn’t receive an update for a given time.

Do not directly attach or detach this observer from the sensor (it does this internally). It is not necessary to retain a reference to the object unless you wish to interact with it later (for example, by calling cancel()).

It must be constructed while there is a running event loop.

cancel() None[source]

Detach from the sensor and make no further updates to it.

katgpucbf.utils.add_gc_stats() None[source]

Add Prometheus metrics for garbage collection timing.

It is only safe to call this once.

katgpucbf.utils.add_signal_handlers(server: DeviceServer) None[source]

Arrange for clean shutdown on SIGINT (Ctrl-C) or SIGTERM.

katgpucbf.utils.add_time_sync_sensors(sensors: SensorSet) Task[source]

Add a number of sensors to a device server to track time synchronisation.

This must be called with an event loop running. It returns a task that keeps the sensors periodically updated.

katgpucbf.utils.comma_split(base_type: Callable[[str], _T], count: int | None = None, allow_single=False) Callable[[str], list[_T]][source]

Return a function to split a comma-delimited str into a list of type _T.

This function is used to parse lists of CPU core numbers, which come from the command-line as comma-separated strings, but are obviously more useful as a list of ints. It’s generic enough that it could process lists of other types as well though if necessary.

Parameters:
  • base_type – The base type of thing you expect in the list, e.g. int, float.

  • count – How many of them you expect to be in the list. None means the list could be any length.

  • allow_single – If true (defaults to false), allow a single value to be used when count is greater than 1. In this case, it will be repeated count times.

katgpucbf.utils.gaussian_dtype(bits: int) dtype[source]

Get numpy dtype for a Gaussian (complex) integer.

Parameters:

bits – Number of bits in each real component

katgpucbf.utils.parse_source(value: str) list[tuple[str, int]] | str[source]

Parse a string into a list of IP endpoints.

katgpucbf.utils.steady_state_timestamp_sensor() Sensor[int][source]

Create steady-state-timestamp sensor.