katgpucbf.xbgpu.xsend module
Module for sending baseline correlation products onto the network.
- class katgpucbf.xbgpu.xsend.Heap(context: AbstractContext, n_channels_per_substream: int, n_baselines: int, channel_offset: int)[source]
Bases:
objectHold all the data for a heap.
The content of the heap can change, but the class is frozen.
- class katgpucbf.xbgpu.xsend.XSend(output_name: str, n_ants: int, n_channels: int, n_channels_per_substream: int, dump_interval_s: float, send_rate_factor: float, channel_offset: int, context: AbstractContext, stream_factory: Callable[[StreamConfig, Sequence[ndarray]], spead2.send.asyncio.AsyncStream], n_send_heaps_in_flight: int = 5, packet_payload: int = 8192, send_enabled: bool = False)[source]
Bases:
SendClass for turning baseline correlation products into SPEAD heaps and transmitting them.
This class creates a queue of buffers that can be sent out onto the network. To get one of these buffers call
get_free_heap()- it will return a buffer. Once the necessary data has been copied to the buffer and it is ready to be sent onto the network, pass it back to this object usingsend_heap(). This object will create a limited number of buffers and keep recycling them - avoiding any memory allocation at runtime.This has been designed to run in an asyncio loop, and
get_free_heap()function makes sure that the next buffer in the queue is not in flight before returning.To allow this class to be used with multiple transports, the constructor takes a factory function to create the stream.
- Parameters:
n_ants – The number of antennas that have been correlated.
n_channels – The total number of channels across all X-Engines. Must be a multiple of n_channels_per_substream.
n_channels_per_substream – The number of frequency channels contained per substream.
dump_interval_s – A new heap is transmitted every dump_interval_s seconds. Set to zero to send as fast as possible.
send_rate_factor –
Configure the spead2 sender with a rate proportional to this factor. This value is intended to dictate a data transmission rate slightly higher/faster than the ADC rate.
Note
A factor of zero (0) tells the sender to transmit as fast as possible.
channel_offset – Fixed value to be included in the SPEAD heap indicating the lowest channel value transmitted by this heap. Must be a multiple of n_channels_per_substream.
context – All buffers to be transmitted will be created from this context.
stream_factory – Callback function that will create the spead2 stream. It is passed the stream configuration and the memory buffers.
n_send_heaps_in_flight – Number of buffers that will be queued at any one time. I don’t see any need for this to be configurable, the data rates are likely too low for it to be an issue. I have put it here more to be explicit than anything else. This argument is optional.
packet_payload – Size in bytes for output packets (baseline correlation products payload only, headers and padding are then added to this).
send_enabled – Start with output transmission enabled.
- async get_free_heap() Heap[source]
Return a heap from the internal fifo queue when one is available.
There are a limited number of heaps in existence and they are all stored with a future object. If the future is complete, the buffer is not being used for sending and it will return the heap immediately. If the future is still busy, this function will wait asynchronously for the future to be done.
This function is compatible with asyncio.
- Returns:
Free heap
- Return type:
heap
- katgpucbf.xbgpu.xsend.make_item_group(xeng_raw_shape: tuple[int, ...]) ItemGroup[source]
Create an item group (with no values).
- katgpucbf.xbgpu.xsend.make_stream(*, output_name: str, dest_ip: str, dest_port: int, interface_ip: str, ttl: int, use_ibv: bool, affinity: int, comp_vector: int, buffer_size: int, stream_config: StreamConfig, buffers: Sequence[ndarray]) spead2.send.asyncio.AsyncStream[source]
Produce a UDP spead2 stream used for transmission.