katgpucbf.dsim.send module

Transmission of SPEAD data.

class katgpucbf.dsim.send.HeapSet(data: Dataset)[source]

Bases: object

Collection of heaps making up a signal.

The heaps are split into two parts, each of which is preprocessed to allow efficient transmission.

This class should normally be constructed with create().

Parameters:

data

An xarray data set with the following variables:

timestamps

1D array of timestamps, big-endian 64-bit

digitiser_status

2D array of digitiser status values, big-endian 64-bit (indexed by polarisation and time)

payload

2D array of raw sample data (indexed by polarisation and time)

heaps

Heaps referencing the timestamps and payload

The dimensions must be time, pol and data.

classmethod create(timestamps: ndarray, n_substreams: Sequence[int], heap_size: int, digitiser_id: Sequence[int]) Self[source]

Create from shape parameters.

Parameters:
  • timestamps – The timestamp array to associate with the HeapSet (must be big-endian 64-bit).

  • n_substreams – Number of substreams to distribute the heaps across, per polarisation

  • heap_size – Number of bytes of payload per heap

  • digitiser_id – Digitiser ID to insert into the packets, per polarisation (LSB should indicate polarisation)

class katgpucbf.dsim.send.Sender(stream: spead2.send.asyncio.AsyncStream, heap_set: HeapSet, heap_samples: int)[source]

Bases: object

Manage sending packets.

halt() None[source]

Request run() to stop, but do not wait for it.

async join() None[source]

Wait for run() to finish.

This does not cause it to stop: use halt() for that.

async run(first_timestamp: int, time_converter: TimeConverter) None[source]

Send heaps continuously.

async set_heaps(heap_set: HeapSet) int[source]

Switch out the heap set for a different one.

This does not return until the payload of the previous HeapSet is no longer in use (the timestamps may still be in use).

The new heap_set must share timestamps with the old one.

Returns:

First timestamp which will use the new heap set

Return type:

timestamp

async stop() None[source]

Stop run() and wait for it to finish.

katgpucbf.dsim.send.make_stream(*, endpoints: Iterable[tuple[str, int]], heap_sets: Iterable[HeapSet], n_pols: int, adc_sample_rate: float, heap_samples: int, sample_bits: int, max_heaps: int, ttl: int, interface_address: str, ibv: bool, affinity: int, comp_vector: int, buffer_size: int) spead2.send.asyncio.AsyncStream[source]

Create a spead2 stream for sending.

Parameters:
  • endpoints – Destinations (host and port) for all substreams

  • n_pols – Number of single-pol streams to send

  • adc_sample_rate – Sample rate for each single-pol stream, in Hz

  • heap_samples – Number of samples to send in each heap (each heap will be sent as a single packet)

  • sample_bits – Number of bits per sample

  • max_heaps – Maximum number of heaps that may be in flight at once

  • ttl – IP TTL field

  • interface_address – IP address of the interface from which to send the data

  • ibv – If true, use ibverbs for acceleration

  • affinity – If non-negative, bind the sending thread to this CPU core

  • comp_vector – Completion vector for ibverbs

katgpucbf.dsim.send.make_stream_base(*, config: StreamConfig, endpoints: Iterable[tuple[str, int]], ttl: int, interface_address: str, ibv: bool = False, affinity: int = -1, comp_vector: int = 0, buffer_size: int, memory_regions: list | None = None) spead2.send.asyncio.AsyncStream[source]

Create a spead2 stream for sending.

This is the low-level support for making either a data or a descriptor stream. Refer to make_stream() for explanations of the arguments.