Source code for katgpucbf.fgpu.compute

################################################################################
# Copyright (c) 2020-2025, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""The :class:`Compute` class specifies the sequence of operations on the GPU.

Allocations of memory for input, intermediate and output are also handled here.
"""

from collections.abc import Sequence
from dataclasses import dataclass
from fractions import Fraction

import numpy as np
from katsdpsigproc import accel, fft
from katsdpsigproc.abc import AbstractCommandQueue, AbstractContext

from .. import N_POLS
from ..utils import DitherType
from . import ddc, pfb, postproc


[docs] @dataclass class NarrowbandConfig: """Configuration for a narrowband stream.""" #: Factor by which bandwidth is reduced decimation: int #: Mixer frequency, in cycles per ADC sample mix_frequency: Fraction #: Downconversion filter weights (float) weights: np.ndarray #: Whether we are to discard the outer 50% of channels discard: bool
[docs] class ComputeTemplate: """Template for the channelisation operation sequence. The reason for doing things this way can be read in the relevant `katsdpsigproc docs`_. .. _katsdpsigproc docs: https://katsdpsigproc.readthedocs.io/en/latest/user/operations.html#operation-templates Parameters ---------- context The GPU context that we'll operate in. taps The number of taps that you want the resulting PFB-FIRs to have. channels Number of output channels into which the input data will be decomposed. dig_sample_bits Number of bits per digitiser sample. out_bits Number of bits per output real component. dither Type of dithering to apply before quantisation. narrowband Configuration for narrowband operation. If ``None``, wideband is assumed. """ def __init__( self, context: AbstractContext, taps: int, channels: int, dig_sample_bits: int, out_bits: int, dither: DitherType, narrowband: NarrowbandConfig | None, ) -> None: self.context = context self.taps = taps self.channels = channels self.narrowband = narrowband self.unzip_factor = 4 if channels >= 8 else 1 if narrowband is None: self.internal_channels = channels self.postproc = postproc.PostprocTemplate( context, channels, self.unzip_factor, complex_pfb=False, out_bits=out_bits, dither=dither ) self.pfb_fir = pfb.PFBFIRTemplate( context, taps, channels, dig_sample_bits, self.unzip_factor, n_pols=N_POLS ) self.ddc: ddc.DDCTemplate | None = None else: self.internal_channels = 2 * channels if narrowband.discard else channels subsampling = narrowband.decimation if narrowband.discard else 2 * narrowband.decimation self.postproc = postproc.PostprocTemplate( context, self.internal_channels, self.unzip_factor, complex_pfb=True, out_bits=out_bits, dither=dither, out_channels=(channels // 2, 3 * channels // 2) if narrowband.discard else None, ) self.pfb_fir = pfb.PFBFIRTemplate( context, taps, self.internal_channels, 32, self.unzip_factor, complex_input=True, n_pols=N_POLS ) self.ddc = ddc.DDCTemplate(context, len(narrowband.weights), subsampling, dig_sample_bits)
[docs] def instantiate( self, command_queue: AbstractCommandQueue, samples: int, spectra: int, spectra_per_heap: int, *, seed: int, sequence_first: int, sequence_step: int = 1, ) -> "Compute": # We have to put the return type in quotes because we haven't declared the `Compute` class yet. """Generate a :class:`Compute` object based on the template.""" return Compute( self, command_queue, samples, spectra, spectra_per_heap, seed=seed, sequence_first=sequence_first, sequence_step=sequence_step, )
[docs] class Compute(accel.OperationSequence): """The DSP processing pipeline handling F-engine operation. The actual running of this operation isn't done through the :meth:`_run` method or by calling it directly, if you're familiar with the usual method of `composing operations`_. Fgpu's compute is streaming rather than batched, i.e. we have to coordinate the receiving of new data and the transmission of processed data along with the actual processing operation. Currently, no internal checks for consistency of the parameters are performed. The following constraints are assumed, Bad Things(TM) may happen if they aren't followed: - spectra_per_heap <= spectra - i.e. a chunk of data must be enough to send out at least one heap. - spectra % spectra_per_heap == 0 - samples >= output.window (see :class:`.fgpu.output.Output`). An input chunk requires at least enough samples to output a single spectrum. - samples % 8 == 0 .. _composing operations: https://katsdpsigproc.readthedocs.io/en/latest/user/operations.html#composing-operations Parameters ---------- template Template for the channelisation operation sequence. command_queue The GPU command queue (typically this will be a CUDA Stream) on which actual processing operations are to be scheduled. samples Number of samples in each input chunk (per polarisation), including padding samples. spectra Number of spectra in each output chunk. spectra_per_heap Number of spectra to send in each output heap. seed, sequence_first, sequence_step See :class:`.RandomStateBuilder`. """ def __init__( self, template: ComputeTemplate, command_queue: AbstractCommandQueue, samples: int, spectra: int, spectra_per_heap: int, *, seed: int, sequence_first: int, sequence_step: int = 1, ) -> None: self.template = template self.samples = samples self.spectra = spectra self.spectra_per_heap = spectra_per_heap operations: list[tuple[str, accel.Operation]] = [] # DDC, PFB-FIR and FFT each happen for each polarisation. if template.ddc is None: # Wideband self.ddc: ddc.DDC | None = None else: # Narrowband assert template.narrowband is not None if samples % template.ddc.subsampling != 0: raise ValueError(f"samples ({samples}) must be a multiple of subsampling ({template.ddc.subsampling})") self.ddc = template.ddc.instantiate(command_queue, samples, N_POLS) self.ddc.configure(template.narrowband.mix_frequency, template.narrowband.weights) operations.append(("ddc", self.ddc)) samples = self.ddc.out_samples # Number of samples available to remainder of pipeline self.pfb_fir = template.pfb_fir.instantiate(command_queue, samples, spectra) fft_shape = (N_POLS, spectra, template.unzip_factor, template.internal_channels // template.unzip_factor) fft_template = fft.FftTemplate( template.context, 1, fft_shape, np.complex64, np.complex64, fft_shape, fft_shape, ) self.fft = fft_template.instantiate(command_queue, fft.FftMode.FORWARD) self.postproc = template.postproc.instantiate( command_queue, spectra, spectra_per_heap, seed=seed, sequence_first=sequence_first, sequence_step=sequence_step, ) operations.append(("pfb_fir", self.pfb_fir)) operations.append(("fft", self.fft)) operations.append(("postproc", self.postproc)) compounds = { "fft_work": ["fft:work_area"], "fft_out": ["fft:dest", "postproc:in"], "weights": ["pfb_fir:weights"], "out": ["postproc:out"], "saturated": ["postproc:saturated"], "fine_delay": ["postproc:fine_delay"], "phase": ["postproc:phase"], "gains": ["postproc:gains"], } aliases = {} if template.ddc is None: compounds["in"] = ["pfb_fir:in"] compounds["dig_total_power"] = ["pfb_fir:total_power"] else: compounds["in"] = ["ddc:in"] compounds["subsampled"] = ["ddc:out", "pfb_fir:in"] # pfb_fir:out is an array of real values (in wideband) while # fft:src reinterprets it as an array of complex values. We thus # have to make them aliases to view the memory as different # types. # # Since the dimensions aren't linked, it is important that both # Operations specify the dimensions as unpadded. aliases["fft_in"] = ["pfb_fir:out", "fft:src"] super().__init__(command_queue, operations, compounds, aliases)
[docs] def run_ddc(self, samples: accel.DeviceArray, first_sample: int) -> None: """Run the narrowband DDC kernel on the received samples. Parameters ---------- samples A device array containing the samples. first_sample Timestamp (in samples) of the initial sample. This is used to correctly phase the mixer. """ assert self.ddc is not None self.bind(**{"in": samples}) self.ensure_bound("subsampled") # Compute the fractional part of first_sample * mix_frequency. # Using Fraction avoids the serious rounding errors that would # occur using floating point. assert isinstance(self.ddc.mix_frequency, Fraction) phase = self.ddc.mix_frequency * first_sample phase -= round(phase) self.ddc.mix_phase = phase self.ddc()
def _run_frontend_common( self, in_offsets: Sequence[int], out_offset: int, spectra: int, ) -> None: """Do common parts of :meth:`run_wideband_frontend` and :meth:`run_narrowband_frontend`.""" self.ensure_bound("weights") self.ensure_bound("fft_in") self.pfb_fir.in_offset[:] = in_offsets self.pfb_fir.out_offset = out_offset self.pfb_fir.spectra = spectra self.pfb_fir()
[docs] def run_wideband_frontend( self, samples: accel.DeviceArray, dig_total_power: accel.DeviceArray, in_offsets: Sequence[int], out_offset: int, spectra: int, ) -> None: """Run the PFB-FIR on the received samples, for a wideband pipeline. Parameters ---------- samples A device arrays containing the samples dig_total_power A device array holding digitiser total power for each pol. This is not zeroed. in_offsets Index of first sample in input array to process (one for each pol). out_offset Index of first sample in output array to write. spectra How many spectra worth of samples to push through the PFB-FIR. """ assert self.ddc is None if samples.shape[0] != N_POLS: raise ValueError(f"samples must have size {N_POLS} on the first dimension") if len(in_offsets) != N_POLS: raise ValueError(f"in_offsets must contain {N_POLS} elements") self.bind(**{"in": samples, "dig_total_power": dig_total_power}) self._run_frontend_common(in_offsets, out_offset, spectra)
[docs] def run_narrowband_frontend( self, in_offsets: Sequence[int], out_offset: int, spectra: int, ) -> None: """Run the PFB-FIR on the received samples, for a narrowband pipeline. Parameters ---------- in_offsets Index of first sample in input array to process (one for each pol). out_offset Index of first sample in output array to write. spectra How many spectra worth of samples to push through the PFB-FIR. """ assert self.ddc is not None self.ensure_bound("subsampled") self._run_frontend_common(in_offsets, out_offset, spectra)
[docs] def run_backend(self, out: accel.DeviceArray, saturated: accel.DeviceArray) -> None: """Run the FFT and postproc on the data which has been PFB-FIRed. Postproc incorporates fine-delay, requantisation and corner-turning. Parameters ---------- out Destination for the processed data. """ self.bind(out=out, saturated=saturated) # Note: we only actually need to bind the slots specific to the # backend, but there are quite a few to keep track of, and by the # time the backend is run the frontend slots should all be bound # anyway. self.ensure_all_bound() self.fft() self.postproc()