katgpucbf.utils module

A collection of utility functions for katgpucbf.

class katgpucbf.utils.DeviceStatusSensor(target: SensorSet, name: str = 'device-status', description: str = 'Overall engine health')[source]

Bases: SimpleAggregateSensor[DeviceStatus]

Summary sensor for quickly ascertaining device status.

This takes its value from the worst status of its target set of sensors, so it’s quick to identify if there’s something wrong, or if everything is good.

aggregate_add(sensor: Sensor, reading: Reading) bool[source]

Update internal state with an additional reading.

Returns:

True if the new reading should result in a state update.

Return type:

bool

aggregate_compute() tuple[Status, DeviceStatus][source]

Compute aggregate status and value from the internal state.

aggregate_remove(sensor: Sensor, reading: Reading) bool[source]

Update internal state by removing a reading.

Returns:

True if removing the reading should result in a state update.

Return type:

bool

filter_aggregate(sensor: Sensor) bool[source]

Decide whether another sensor is part of the aggregation.

Users can override this function to exclude certain categories of sensors, such as other aggregates, to prevent circular references.

Returns:

True if sensor should be included in calculation of the aggregate, False if not.

Return type:

bool

update_aggregate(updated_sensor: Sensor | None, reading: Reading | None, old_reading: Reading | None) Reading[DeviceStatus] | None[source]

Update the aggregated sensor.

The user is required to override this function, which must return the updated Reading (i.e. value, status and timestamp) which will be reflected in the Reading of the aggregated sensor.

Parameters:
  • updated_sensor – The sensor in the target SensorSet which has changed in some way.

  • reading – The current reading of the updated_sensor. This is None if the sensor is being removed from the set.

  • old_reading – The previous reading of the updated_sensor. This is None if the sensor is being added to the set.

Returns:

The reading (value, status, timestamp) that should be shown by the AggregatedSensor as a result of the change. If None is returned, the sensor’s reading is not modified.

Return type:

Optional[Reading]

class katgpucbf.utils.DitherType(*values)[source]

Bases: Enum

Type of dithering to apply prior to quantisation.

DEFAULT = 1
NONE = 0
UNIFORM = 1
class katgpucbf.utils.Engine(host: str, port: int)[source]

Bases: DeviceServer

Common base for engines (katcp device servers).

BUILD_STATE: str = '0.1.dev294+ged2ec5ce5'
add_service_task(task: Task, *, wait_on_stop: bool = False) None[source]

Register an asynchronous task that runs as part of the server.

This extends aiokatcp.DeviceServer.add_service_task() with the wait_on_stop parameter. If true, stopping the server will wait for the task to complete rather than cancelling it, unless one of the service tasks raised an exception (in which case waiting may hang because things are not shutting down cleanly).

async on_stop() None[source]

Cancel tasks registered with add_cancel_task() on shutdown.

update_steady_state_timestamp(timestamp: int) None[source]

Update steady-state-timestamp sensor to at least timestamp.

class katgpucbf.utils.TimeConverter(sync_time: float, adc_sample_rate: float)[source]

Bases: object

Convert times between UNIX timestamps and ADC sample counts.

Note that because UNIX timestamps are handled as 64-bit floats, they are only accurate to roughly microsecond precision, and will not round-trip precisely.

Parameters:
  • sync_time – UNIX timestamp corresponding to ADC timestamp 0

  • adc_sample_rate – Number of ADC samples per second

  • todo:: (..) – This does not yet handle leap-seconds correctly.

adc_to_unix(samples: float) float[source]

Convert an ADC sample count to a UNIX timstamp.

unix_to_adc(timestamp: float) float[source]

Convert a UNIX timestamp to an ADC sample count.

class katgpucbf.utils.TimeoutSensorStatusObserver(sensor: Sensor, timeout: float, new_status: Status)[source]

Bases: object

Change the status of a sensor if it doesn’t receive an update for a given time.

Do not directly attach or detach this observer from the sensor (it does this internally). It is not necessary to retain a reference to the object unless you wish to interact with it later (for example, by calling cancel()).

It must be constructed while there is a running event loop.

cancel() None[source]

Detach from the sensor and make no further updates to it.

katgpucbf.utils.add_time_sync_sensors(sensors: SensorSet) Task[source]

Add a number of sensors to a device server to track time synchronisation.

This must be called with an event loop running. It returns a task that keeps the sensors periodically updated.

katgpucbf.utils.gaussian_dtype(bits: int) dtype[source]

Get numpy dtype for a Gaussian (complex) integer.

Parameters:

bits – Number of bits in each real component

katgpucbf.utils.make_rate_limited_sensor(sensor_type: type[T], name: str, description: str = '', units: str = '', default: T | None = None, initial_status: Status = Status.UNKNOWN, *args, **kwargs) Sensor[source]

Create a sensor whose auto strategy has a minimum update interval of MIN_SENSOR_UPDATE_PERIOD.

katgpucbf.utils.make_steady_state_timestamp_sensor() Sensor[int][source]

Create steady-state-timestamp sensor.