katgpucbf.utils module
A collection of utility functions for katgpucbf.
- class katgpucbf.utils.DeviceStatusSensor(target: SensorSet, name: str = 'device-status', description: str = 'Overall engine health')[source]
Bases:
SimpleAggregateSensor[DeviceStatus]Summary sensor for quickly ascertaining device status.
This takes its value from the worst status of its target set of sensors, so it’s quick to identify if there’s something wrong, or if everything is good.
- aggregate_add(sensor: Sensor, reading: Reading) bool[source]
Update internal state with an additional reading.
- Returns:
True if the new reading should result in a state update.
- Return type:
- aggregate_compute() tuple[Status, DeviceStatus][source]
Compute aggregate status and value from the internal state.
- aggregate_remove(sensor: Sensor, reading: Reading) bool[source]
Update internal state by removing a reading.
- Returns:
True if removing the reading should result in a state update.
- Return type:
- filter_aggregate(sensor: Sensor) bool[source]
Decide whether another sensor is part of the aggregation.
Users can override this function to exclude certain categories of sensors, such as other aggregates, to prevent circular references.
- Returns:
True if sensor should be included in calculation of the aggregate, False if not.
- Return type:
- update_aggregate(updated_sensor: Sensor | None, reading: Reading | None, old_reading: Reading | None) Reading[DeviceStatus] | None[source]
Update the aggregated sensor.
The user is required to override this function, which must return the updated
Reading(i.e. value, status and timestamp) which will be reflected in the Reading of the aggregated sensor.- Parameters:
updated_sensor – The sensor in the target
SensorSetwhich has changed in some way.reading – The current reading of the updated_sensor. This is None if the sensor is being removed from the set.
old_reading – The previous reading of the updated_sensor. This is None if the sensor is being added to the set.
- Returns:
The reading (value, status, timestamp) that should be shown by the AggregatedSensor as a result of the change. If None is returned, the sensor’s reading is not modified.
- Return type:
Optional[Reading]
- class katgpucbf.utils.DitherType(*values)[source]
Bases:
EnumType of dithering to apply prior to quantisation.
- DEFAULT = 1
- NONE = 0
- UNIFORM = 1
- class katgpucbf.utils.Engine(host: str, port: int)[source]
Bases:
DeviceServerCommon base for engines (katcp device servers).
- add_service_task(task: Task, *, wait_on_stop: bool = False) None[source]
Register an asynchronous task that runs as part of the server.
This extends
aiokatcp.DeviceServer.add_service_task()with the wait_on_stop parameter. If true, stopping the server will wait for the task to complete rather than cancelling it, unless one of the service tasks raised an exception (in which case waiting may hang because things are not shutting down cleanly).
- class katgpucbf.utils.TimeConverter(sync_time: float, adc_sample_rate: float)[source]
Bases:
objectConvert times between UNIX timestamps and ADC sample counts.
Note that because UNIX timestamps are handled as 64-bit floats, they are only accurate to roughly microsecond precision, and will not round-trip precisely.
- Parameters:
sync_time – UNIX timestamp corresponding to ADC timestamp 0
adc_sample_rate – Number of ADC samples per second
todo:: (..) – This does not yet handle leap-seconds correctly.
- class katgpucbf.utils.TimeoutSensorStatusObserver(sensor: Sensor, timeout: float, new_status: Status)[source]
Bases:
objectChange the status of a sensor if it doesn’t receive an update for a given time.
Do not directly attach or detach this observer from the sensor (it does this internally). It is not necessary to retain a reference to the object unless you wish to interact with it later (for example, by calling
cancel()).It must be constructed while there is a running event loop.
- katgpucbf.utils.add_time_sync_sensors(sensors: SensorSet) Task[source]
Add a number of sensors to a device server to track time synchronisation.
This must be called with an event loop running. It returns a task that keeps the sensors periodically updated.
- katgpucbf.utils.gaussian_dtype(bits: int) dtype[source]
Get numpy dtype for a Gaussian (complex) integer.
- Parameters:
bits – Number of bits in each real component
- katgpucbf.utils.make_rate_limited_sensor(sensor_type: type[T], name: str, description: str = '', units: str = '', default: T | None = None, initial_status: Status = Status.UNKNOWN, *args, **kwargs) Sensor[source]
Create a sensor whose auto strategy has a minimum update interval of MIN_SENSOR_UPDATE_PERIOD.