katgpucbf.dsim.send module
Transmission of SPEAD data.
- class katgpucbf.dsim.send.HeapSet(data: Dataset)[source]
Bases:
objectCollection of heaps making up a signal.
The heaps are split into two parts, each of which is preprocessed to allow efficient transmission.
This class should normally be constructed with
create().- Parameters:
data –
An xarray data set with the following variables:
- timestamps
1D array of timestamps, big-endian 64-bit
- digitiser_status
2D array of digitiser status values, big-endian 64-bit (indexed by polarisation and time)
- payload
2D array of raw sample data (indexed by polarisation and time)
- heaps
Heaps referencing the timestamps and payload
The dimensions must be
time,polanddata.
- classmethod create(timestamps: ndarray, n_substreams: Sequence[int], heap_size: int, digitiser_id: Sequence[int]) Self[source]
Create from shape parameters.
- Parameters:
timestamps – The timestamp array to associate with the
HeapSet(must be big-endian 64-bit).n_substreams – Number of substreams to distribute the heaps across, per polarisation
heap_size – Number of bytes of payload per heap
digitiser_id – Digitiser ID to insert into the packets, per polarisation (LSB should indicate polarisation)
- class katgpucbf.dsim.send.Sender(stream: spead2.send.asyncio.AsyncStream, heap_set: HeapSet, heap_samples: int)[source]
Bases:
objectManage sending packets.
- async join() None[source]
Wait for
run()to finish.This does not cause it to stop: use
halt()for that.
- async run(first_timestamp: int, time_converter: TimeConverter) None[source]
Send heaps continuously.
- async set_heaps(heap_set: HeapSet) int[source]
Switch out the heap set for a different one.
This does not return until the payload of the previous
HeapSetis no longer in use (the timestamps may still be in use).The new heap_set must share timestamps with the old one.
- Returns:
First timestamp which will use the new heap set
- Return type:
timestamp
- katgpucbf.dsim.send.make_stream(*, endpoints: Iterable[tuple[str, int]], heap_sets: Iterable[HeapSet], n_pols: int, adc_sample_rate: float, heap_samples: int, sample_bits: int, max_heaps: int, ttl: int, interface_address: str, ibv: bool, affinity: int, comp_vector: int, buffer_size: int) spead2.send.asyncio.AsyncStream[source]
Create a spead2 stream for sending.
- Parameters:
endpoints – Destinations (host and port) for all substreams
n_pols – Number of single-pol streams to send
adc_sample_rate – Sample rate for each single-pol stream, in Hz
heap_samples – Number of samples to send in each heap (each heap will be sent as a single packet)
sample_bits – Number of bits per sample
max_heaps – Maximum number of heaps that may be in flight at once
ttl – IP TTL field
interface_address – IP address of the interface from which to send the data
ibv – If true, use ibverbs for acceleration
affinity – If non-negative, bind the sending thread to this CPU core
comp_vector – Completion vector for ibverbs
- katgpucbf.dsim.send.make_stream_base(*, config: StreamConfig, endpoints: Iterable[tuple[str, int]], ttl: int, interface_address: str, ibv: bool = False, affinity: int = -1, comp_vector: int = 0, buffer_size: int, memory_regions: list | None = None) spead2.send.asyncio.AsyncStream[source]
Create a spead2 stream for sending.
This is the low-level support for making either a data or a descriptor stream. Refer to
make_stream()for explanations of the arguments.