katgpucbf.utils module
A collection of utility functions for katgpucbf.
- class katgpucbf.utils.DeviceStatusSensor(target: SensorSet, name: str = 'device-status', description: str = 'Overall engine health')[source]
Bases:
SimpleAggregateSensor[DeviceStatus]Summary sensor for quickly ascertaining device status.
This takes its value from the worst status of its target set of sensors, so it’s quick to identify if there’s something wrong, or if everything is good.
- aggregate_add(sensor: Sensor, reading: Reading) bool[source]
Update internal state with an additional reading.
- Returns:
True if the new reading should result in a state update.
- Return type:
- aggregate_compute() tuple[Status, DeviceStatus][source]
Compute aggregate status and value from the internal state.
- aggregate_remove(sensor: Sensor, reading: Reading) bool[source]
Update internal state by removing a reading.
- Returns:
True if removing the reading should result in a state update.
- Return type:
- filter_aggregate(sensor: Sensor) bool[source]
Decide whether another sensor is part of the aggregation.
Users can override this function to exclude certain categories of sensors, such as other aggregates, to prevent circular references.
- Returns:
True if sensor should be included in calculation of the aggregate, False if not.
- Return type:
- update_aggregate(updated_sensor: Sensor | None, reading: Reading | None, old_reading: Reading | None) Reading[DeviceStatus] | None[source]
Update the aggregated sensor.
The user is required to override this function, which must return the updated
Reading(i.e. value, status and timestamp) which will be reflected in the Reading of the aggregated sensor.- Parameters:
updated_sensor – The sensor in the target
SensorSetwhich has changed in some way.reading – The current reading of the updated_sensor. This is None if the sensor is being removed from the set.
old_reading – The previous reading of the updated_sensor. This is None if the sensor is being added to the set.
- Returns:
The reading (value, status, timestamp) that should be shown by the AggregatedSensor as a result of the change. If None is returned, the sensor’s reading is not modified.
- Return type:
Optional[Reading]
- class katgpucbf.utils.DiscardingIterator(base: AsyncIterable)[source]
Bases:
AsyncIterator,GenericIterator wrapper that can optionally direct items to the bin.
This iterator is in one of two modes:
In discarding mode, it actively pulls items from the underlying iterator and then throws them away (the “throw away” behaviour can be customised by overriding
discard()). Attempting to iterate will immediately raiseStopAsyncIteration.In capturing mode, it simply wraps the underlying iterator. However, it will pre-emptively fetch one item ahead.
Mode switching is best done by using the context manager protocol: entering the context switches to capturing mode, and leaving it returns to discarding mode.
It is safe to switch from capturing to discarding mode at the same time as iterating; this will cause the iteration to be interrupted (although it may iterate one more item if it is ready).
The implementation is currently not robust against the base iterator raising exceptions. The exception will be logged by it will terminate iteration.
- discard(item: T) None[source]
Discard an item.
This can be overridden to customise cleanup of unwanted items.
- start_discarding() None[source]
Switch to discarding mode.
If there is an active call to
__anext__(), it will raiseStopAsyncIteration(it could also iterate one more item if it is already available).
- class katgpucbf.utils.DitherType(*values)[source]
Bases:
EnumType of dithering to apply prior to quantisation.
- DEFAULT = 1
- NONE = 0
- UNIFORM = 1
- class katgpucbf.utils.Engine(host: str, port: int)[source]
Bases:
DeviceServerCommon base for engines (katcp device servers).
- add_service_task(task: Task, *, wait_on_stop: bool = False) None[source]
Register an asynchronous task that runs as part of the server.
This extends
aiokatcp.DeviceServer.add_service_task()with the wait_on_stop parameter. If true, stopping the server will wait for the task to complete rather than cancelling it, unless one of the service tasks raised an exception (in which case waiting may hang because things are not shutting down cleanly).
- class katgpucbf.utils.TimeConverter(sync_time: float, adc_sample_rate: float)[source]
Bases:
objectConvert times between UNIX timestamps and ADC sample counts.
Note that because UNIX timestamps are handled as 64-bit floats, they are only accurate to roughly microsecond precision, and will not round-trip precisely.
- Parameters:
sync_time – UNIX timestamp corresponding to ADC timestamp 0
adc_sample_rate – Number of ADC samples per second
todo:: (..) – This does not yet handle leap-seconds correctly.
- class katgpucbf.utils.TimeoutSensorStatusObserver(sensor: Sensor, timeout: float, new_status: Status)[source]
Bases:
objectChange the status of a sensor if it doesn’t receive an update for a given time.
Do not directly attach or detach this observer from the sensor (it does this internally). It is not necessary to retain a reference to the object unless you wish to interact with it later (for example, by calling
cancel()).It must be constructed while there is a running event loop.
- katgpucbf.utils.add_time_sync_sensors(sensors: SensorSet) Task[source]
Add a number of sensors to a device server to track time synchronisation.
This must be called with an event loop running. It returns a task that keeps the sensors periodically updated.
- katgpucbf.utils.gaussian_dtype(bits: int) dtype[source]
Get numpy dtype for a Gaussian (complex) integer.
- Parameters:
bits – Number of bits in each real component
- katgpucbf.utils.make_rate_limited_sensor(sensor_type: type[T], name: str, description: str = '', units: str = '', default: T | None = None, initial_status: Status = Status.UNKNOWN, *args, **kwargs) Sensor[source]
Create a sensor whose auto strategy has a minimum update interval of MIN_SENSOR_UPDATE_PERIOD.