katgpucbf.vgpu.engine module

Engine class, which does all the actual processing.

class katgpucbf.vgpu.engine.CaptureConfig(recv_config: RecvConfig, send_config: SendConfig, fir_taps: int, hilbert_taps: int, passband: float, threshold: float, power_int_time: int)[source]

Bases: object

Container for all the configuration needed to run a capture session.

fir_taps: int
hilbert_taps: int
passband: float
power_int_time: int
recv_config: RecvConfig
send_config: SendConfig
threshold: float
class katgpucbf.vgpu.engine.RecordPower(*args, sensors: SensorSet, **kwargs)[source]

Bases: RecordPower

Record power levels to sensors.

record_rms(start: int, length: int, rms: DataArray) None[source]

Record the RMS values.

The base class does nothing. Subclasses may override it to take action.

The rms array is guaranteed to be backed by a numpy array rather than GPU memory. To make this efficient when using cupy (and not block the pipeline to transfer data to the host), there may be some delay between the chunk being iterated and this callback.

Parameters:
  • start – Sample index of the first sample in the chunk

  • length – Number of samples in the chunk

  • rms – RMS values from the chunk (with non-time dimensions preserved).

class katgpucbf.vgpu.engine.RecvConfig(sync_time: float, adc_sample_rate: float, n_channels: int, n_channels_per_substream: int, n_spectra_per_heap: int, n_samples_between_spectra: int, n_batches_per_chunk: int, sample_bits: int, srcs: list[list[tuple[str, int]]], interface: str | None, ibv: bool, affinity: int, comp_vector: int, buffer_size: int, pols: tuple[str, str])[source]

Bases: object

Container for all the configuration for receiving data.

adc_sample_rate: float
affinity: int
buffer_size: int
comp_vector: int
ibv: bool
interface: str | None
n_batches_per_chunk: int
n_channels: int
n_channels_per_substream: int
n_samples_between_spectra: int
n_spectra_per_heap: int
property pol_labels: list[str]

Incoming polarisations without any ± prefix.

pols: tuple[str, str]
sample_bits: int
srcs: list[list[tuple[str, int]]]
sync_time: float
class katgpucbf.vgpu.engine.RecvStream(layout: Layout, time_converter: TimeConverter, stream_group: ChunkStreamRingGroup, sensors: SensorSet, pol_labels: tuple[str, str], min_timestamp: int)[source]

Bases: object

Wrap the incoming data stream into a katcbf_vlbi_resample.stream.Stream.

class katgpucbf.vgpu.engine.SendConfig(pols: tuple[str, str], bandwidth: float, n_samples_per_frame: int, rate_factor: float, station: str, dsts: list[tuple[str, int]], interfaces: list[str], buffer_size: int, ttl: int)[source]

Bases: object

Container for all the configuration for sending data.

bandwidth: float
buffer_size: int
dsts: list[tuple[str, int]]
interfaces: list[str]
n_samples_per_frame: int
pols: tuple[str, str]
rate_factor: float
station: str
ttl: int
class katgpucbf.vgpu.engine.VEngine(*, katcp_host: str, katcp_port: int, config: CaptureConfig, monitor: Monitor)[source]

Bases: Engine

Top-level class running the whole thing.

VERSION: str = 'katgpucbf-vgpu-1.0'
async on_stop() None[source]

Cancel tasks registered with add_cancel_task() on shutdown.

async request_capture_start(ctx: RequestContext, timestamp: int = 0) None[source]

Start capturing and emitting data.

Parameters:

timestamp – Minimum ADC timestamp at which to enable emitting.

async request_capture_stop(ctx: RequestContext) None[source]

Stop capturing and emitting data.

async request_vlbi_delay(ctx: RequestContext, delay: float) None[source]

Set the delay applied to the stream, in second.