Source code for katgpucbf.fgpu.postproc

################################################################################
# Copyright (c) 2020-2024, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Postproc module.

These classes handle the operation of the GPU in performing the fine-delay,
per-channel gains, requantisation and corner-turn through a mako-templated
kernel.
"""

from importlib import resources

import numpy as np
from katsdpsigproc import accel
from katsdpsigproc.abc import AbstractCommandQueue, AbstractContext

from .. import N_POLS, utils
from ..curand_helpers import RAND_STATE_DTYPE, RandomStateBuilder
from ..utils import DitherType


[docs] class PostprocTemplate: """Template for the postproc operation. Parameters ---------- context The GPU context that we'll operate in. channels Number of input channels in each spectrum. unzip_factor Radix of the final Cooley-Tukey FFT step performed by the kernel. complex_pfb If true, the PFB is a complex-to-complex transform, and no real-to-complex fixup is needed. Additionally, the DC channel is considered to be the centre of the band i.e. it is written to the middle of the output rather than the start (and similarly, gains for it are loaded from the middle of the gain array etc). out_bits Bits per real/imaginary value. Only 4 or 8 are currently supported. When 4, the real part is in the most-significant bits. dither Type of dithering to apply before quantisation. out_channels Range of channels to write to the output (defaults to all). """ def __init__( self, context: AbstractContext, channels: int, unzip_factor: int = 1, *, complex_pfb: bool, out_bits: int, dither: DitherType, out_channels: tuple[int, int] | None = None, ) -> None: assert dither in {DitherType.NONE, DitherType.UNIFORM} self.block = 16 self.vtx = 1 self.vty = 2 self.channels = channels self.unzip_factor = unzip_factor self.out_bits = out_bits self.dither = dither self.groups_x = accel.divup(channels // unzip_factor // 2 + 1, self.block * self.vtx) if channels <= 0 or channels & (channels - 1): raise ValueError("channels must be a power of 2") if channels % unzip_factor: raise ValueError("channels must be a multiple of unzip_factor") if unzip_factor not in {1, 2, 4}: raise ValueError("unzip_factor must be 1, 2 or 4") if out_bits not in {4, 8}: raise ValueError("out_bits must be 4 or 8") if out_channels is None: self.out_channels = (0, channels) else: if not 0 <= out_channels[0] < out_channels[1] <= channels: raise ValueError("out_channels must be a subrange of [0, channels)") self.out_channels = out_channels with resources.as_file(resources.files(__package__)) as resource_dir: program = accel.build( context, "kernels/postproc.mako", { "block": self.block, "vtx": self.vtx, "vty": self.vty, "groups_x": self.groups_x, "channels": channels, "out_low": self.out_channels[0], "out_high": self.out_channels[1], "out_bits": self.out_bits, "unzip_factor": unzip_factor, "complex_pfb": complex_pfb, "dither": bool(dither.value), }, extra_dirs=[str(resource_dir), str(resource_dir.parent)], ) self.kernel = program.get_kernel("postproc")
[docs] def instantiate( self, command_queue: AbstractCommandQueue, spectra: int, spectra_per_heap: int, *, seed: int, sequence_first: int, sequence_step: int = 1, ) -> "Postproc": """Generate a :class:`Postproc` object based on this template.""" return Postproc( self, command_queue, spectra, spectra_per_heap, seed=seed, sequence_first=sequence_first, sequence_step=sequence_step, )
[docs] class Postproc(accel.Operation): """The fine-delay, requant and corner-turn operations coming after the PFB. .. rubric:: Slots **in** : N_POLS × spectra × unzip_factor × channels // unzip_factor, complex64 Input channelised data for the two polarisations. These are formed by taking the complex-to-complex Fourier transform of the input reinterpreted as a complex input. See :ref:`fgpu-fft` for details. **out** : spectra // spectra_per_heap × out_channels × spectra_per_heap × N_POLS Output F-engine data, quantised and corner-turned, ready for transmission on the network. See :func:`.gaussian_dtype` for the type. **saturated** : spectra // spectra_per_heap × N_POLS, uint32 Number of saturated complex values in **out**. **fine_delay** : spectra × N_POLS, float32 Fine delay in samples (one value per pol). **phase** : spectra × N_POLS, float32 Fixed phase adjustment in radians (one value per pol). **gains** : out_channels × N_POLS, complex64 Per-channel gain (one value per pol). **rand_states** : implementation-defined Random states. This slot is set up by the constructor and should normally not need to be touched. It is only present if dithering is enabled. Parameters ---------- template: PostprocTemplate The template for the post-processing operation. command_queue: AbstractCommandQueue The GPU command queue (typically this will be a CUDA Stream) on which actual processing operations are to be scheduled. spectra: int Number of spectra on which post-prodessing will be performed. spectra_per_heap: int Number of spectra to send out per heap. seed, sequence_first, sequence_step See :class:`.RandomStateBuilder`. These are ignored if the template disables dithering. """ def __init__( self, template: PostprocTemplate, command_queue: AbstractCommandQueue, spectra: int, spectra_per_heap: int, *, seed: int, sequence_first: int, sequence_step: int = 1, ) -> None: super().__init__(command_queue) if spectra % spectra_per_heap != 0: raise ValueError("spectra must be a multiple of spectra_per_heap") heaps = spectra // spectra_per_heap block_y = template.block * template.vty if spectra_per_heap % block_y != 0: raise ValueError(f"spectra_per_heap must be a multiple of {block_y}") self.template = template self.spectra = spectra self.spectra_per_heap = spectra_per_heap self._groups_y = spectra_per_heap // block_y self._heaps = heaps pols = accel.Dimension(N_POLS, exact=True) in_shape = ( accel.Dimension(N_POLS), accel.Dimension(spectra), accel.Dimension(template.unzip_factor, exact=True), accel.Dimension(template.channels // template.unzip_factor, exact=True), ) n_out_channels = template.out_channels[1] - template.out_channels[0] out_dtype = utils.gaussian_dtype(template.out_bits) self.slots["in"] = accel.IOSlot(in_shape, np.complex64) self.slots["out"] = accel.IOSlot((heaps, n_out_channels, spectra_per_heap, pols), out_dtype) self.slots["saturated"] = accel.IOSlot((heaps, pols), np.uint32) self.slots["fine_delay"] = accel.IOSlot((spectra, pols), np.float32) self.slots["phase"] = accel.IOSlot((spectra, pols), np.float32) self.slots["gains"] = accel.IOSlot((n_out_channels, pols), np.complex64) if template.dither == DitherType.UNIFORM: # This could be seen as multi-dimensional, but we flatten it to 1D as an # easy way to guarantee that it is not padded. rand_states_shape = (template.groups_x * self._groups_y * template.block * template.block,) self.slots["rand_states"] = accel.IOSlot(rand_states_shape, RAND_STATE_DTYPE) builder = RandomStateBuilder(command_queue.context) rand_states = builder.make_states( command_queue, rand_states_shape, seed=seed, sequence_first=sequence_first, sequence_step=sequence_step ) self.bind(rand_states=rand_states) def _run(self) -> None: out = self.buffer("out") saturated = self.buffer("saturated") in_ = self.buffer("in") saturated.zero(self.command_queue) self.command_queue.enqueue_kernel( self.template.kernel, [ out.buffer, saturated.buffer, in_.buffer, self.buffer("fine_delay").buffer, self.buffer("phase").buffer, self.buffer("gains").buffer, ] + ([self.buffer("rand_states").buffer] if self.template.dither == DitherType.UNIFORM else []) + [ np.int32(out.padded_shape[1] * out.padded_shape[2]), # out_stride_z np.int32(out.padded_shape[2]), # out_stride np.int32(np.prod(in_.padded_shape[1:])), # in_stride np.int32(self.spectra_per_heap), # spectra_per_heap np.int32(self._heaps), # heaps ], global_size=( self.template.block * self.template.groups_x, self.template.block * self._groups_y, 1, ), local_size=(self.template.block, self.template.block, 1), )