katgpucbf.vgpu.recv module

Handle receiving tied-array-channelised-voltage data.

class katgpucbf.vgpu.recv.Layout(sample_bits: int, n_channels: int, n_channels_per_substream: int, n_spectra_per_heap: int, n_batches_per_chunk: int, heap_timestamp_step: int)[source]

Bases: BaseLayout

Parameters controlling the sizes of heaps and chunks.

Parameters:
  • sample_bits (int) – Bits per sample (for each of real and imaginary).

  • n_channels (int) – Total number of channels.

  • n_channels_per_substream (int) – The number of frequency channels in each beam substream.

  • n_spectra_per_heap (int) – The number of samples on the time axis in each heap.

  • n_batches_per_chunk (int) – Number of heaps per chunk on the time axis.

  • heap_timestamp_step (int) – Increase in timestamp between successive heaps. Timestamps must also be a multiple of this value.

property batch_heaps: int

Number of heaps per chunk, on the axes other than time.

property chunk_batches: int

Number of batches per chunk.

property chunk_timestamp_step: int

Expected increase in timestamp from one chunk to the next.

property heap_bytes: int

Number of payload bytes per heap.

property heap_sample_count: int

Number of samples per heap.

The meaning of samples is up to the concrete subclass. It is used only for statistics.

heap_timestamp_step: int
n_batches_per_chunk: int
n_channels: int
n_channels_per_substream: int
property n_pol_substreams: int

Number of substreams in each polarisation.

n_spectra_per_heap: int
sample_bits: int
katgpucbf.vgpu.recv.MAX_CHUNKS = 2

Number of chunks to allow to be under construction

katgpucbf.vgpu.recv.iter_chunks(ringbuffer: ChunkRingbuffer, layout: Layout, sensors: SensorSet, time_converter: TimeConverter, pol_labels: Sequence[str]) AsyncGenerator[Chunk, None][source]

Iterate over the chunks and update sensors.

It also populates the chunk timestamp.

Parameters:
  • ringbuffer – Source of chunks.

  • layout – Structure of the streams.

  • sensors – Sensor set containing at least the sensors created by make_sensors().

  • time_converter – Converter to turn data timestamps into sensor timestamps.

  • pol_labels – Input polarisation labels (must match those passed to make_stream_group()).

katgpucbf.vgpu.recv.make_stream_group(layout: Layout, data_ringbuffer: ChunkRingbuffer, free_ringbuffer: ChunkRingbuffer, recv_affinity: int, pol_labels: Sequence[str]) ChunkStreamRingGroup[source]

Create a stream group for receiving dual-polarised beam data.

The readers are not added to the streams.

Parameters:
  • layout – Heap size and chunking parameters.

  • data_ringbuffer – Output ringbuffer to which chunks will be sent.

  • free_ringbuffer – Ringbuffer for holding chunks for recycling once they’ve been used.

  • recv_affinity – CPU core affinity for the worker thread. Use -1 to indicate no affinity.

  • pol_labels – Prometheus labels to apply to the polarisations (must have length 2).