Source code for katgpucbf.xbgpu.beamform
################################################################################
# Copyright (c) 2023-2024, National Research Foundation (SARAO)
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy
# of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""Implement the calculations for beamforming."""
from collections.abc import Sequence
from dataclasses import dataclass
from importlib import resources
import numpy as np
from katsdpsigproc import accel
from katsdpsigproc.abc import AbstractCommandQueue, AbstractContext
from .. import COMPLEX, N_POLS
from ..curand_helpers import RAND_STATE_DTYPE, RandomStateBuilder
from ..utils import DitherType
[docs]
@dataclass
class Beam:
"""Compile-time parameters for a single-polarisation beam."""
#: Either 0 or 1, to indicate which input polarisation to use in the beam.
pol: int
#: The type of dithering to apply before quantisation.
dither: DitherType
def __post_init__(self) -> None:
assert self.pol in {0, 1}
# These are the only implemented types so far
assert self.dither in {DitherType.NONE, DitherType.UNIFORM}
[docs]
class BeamformTemplate:
"""Template for beamforming.
Parameters
----------
context
The GPU context that we'll operate in.
beams
One entry per single-polarisation output beam.
n_spectra_per_batch
Number of samples in time axis for each batch (fine time dimension)
- see :class:`Beamform`.
dither
One entry per single-polarisation output beam. Each entry indicates
the type of dithering to apply before quantisation.
"""
def __init__(self, context: AbstractContext, beams: Sequence[Beam], n_spectra_per_batch: int) -> None:
# TODO: tune these.
self.block_spectra = min(128, n_spectra_per_batch)
self.beams = beams
self.n_spectra_per_batch = n_spectra_per_batch
with resources.as_file(resources.files(__package__)) as resource_dir:
program = accel.build(
context,
"kernels/beamform.mako",
{
"block_spectra": self.block_spectra,
"beams": beams,
"dither_enabled": self._dither_enabled,
"DitherType": DitherType,
},
extra_dirs=[str(resource_dir), str(resource_dir.parent)],
)
self.kernel = program.get_kernel("beamform")
@property
def _dither_enabled(self) -> bool:
"""Whether any of the beams has dithering enabled."""
return any(beam.dither != DitherType.NONE for beam in self.beams)
[docs]
def instantiate(
self,
command_queue: AbstractCommandQueue,
n_batches: int,
n_ants: int,
n_channels_per_substream: int,
seed: int,
sequence_first: int,
sequence_step: int = 1,
) -> "Beamform":
"""Generate a :class:`Beamform` object based on the template."""
return Beamform(
self, command_queue, n_batches, n_ants, n_channels_per_substream, seed, sequence_first, sequence_step
)
[docs]
class Beamform(accel.Operation):
r"""Operation for beamforming.
For ease-of-use with the data formats used in the rest of katgpucbf,
time is split into two dimensions: a coarse outer dimension (called
"batches") and a finer inner dimension ("spectra").
.. rubric:: Slots
**in** : n_batches × n_ants × n_channels_per_substream × n_spectra_per_batch × N_POLS × COMPLEX, int8
Complex (Gaussian integer) input channelised voltages
**out** : n_batches × n_beams × n_channels_per_substream × n_spectra_per_batch × COMPLEX, int8
Complex (Gaussian integer) output channelised voltages
**saturated**: n_beams, uint32
Number of saturated output values, per beam. This value is *incremented*
by the kernel, so should be explicitly zeroed first if desired.
**weights** : n_ants × n_beams, complex64
Complex scale factor to apply to each antenna for each beam
**delays** : n_ants × n_beams, float32
Delay used to compute channel-dependent phase rotation. The
rotation applied is :math:`e^{j\pi cd}` where :math:`c` is
the channel number and :math:`d` is the delay value. Note
that this will not apply any rotation to the first channel
in the data; any such rotation needs to be baked into **weights**.
**rand_states** : n_batches × n_channels_per_substream × n_spectra_per_batch, randState_t (packed)
Independent random states for generating dither values. This is set
up by the constructor and should not normally need to be touched. It
is only present if dithering is enabled on at least one beam.
Parameters
----------
template
The template for the operation
command_queue
The command queue on which to enqueue the work
n_batches
Number of batches (coarse time dimension)
n_ants
Number of antennas
n_channels_per_substream
Number of frequency channels
seed, sequence_first, sequence_step
See :class:`.RandomStateBuilder`. These are ignored if the template
disables dithering.
"""
def __init__(
self,
template: BeamformTemplate,
command_queue: AbstractCommandQueue,
n_batches: int,
n_ants: int,
n_channels_per_substream: int,
seed: int,
sequence_first: int,
sequence_step: int = 1,
) -> None:
super().__init__(command_queue)
self.template = template
pol_dim = accel.Dimension(N_POLS, exact=True)
complex_dim = accel.Dimension(COMPLEX, exact=True)
n_beams = len(template.beams)
n_spectra_per_batch = template.n_spectra_per_batch
builder = RandomStateBuilder(command_queue.context)
self.slots["in"] = accel.IOSlot(
(n_batches, n_ants, n_channels_per_substream, n_spectra_per_batch, pol_dim, complex_dim), np.int8
)
self.slots["out"] = accel.IOSlot(
(n_batches, n_beams, n_channels_per_substream, n_spectra_per_batch, complex_dim), np.int8
)
self.slots["saturated"] = accel.IOSlot((n_beams,), np.uint32)
weights_dims = (n_ants, accel.Dimension(n_beams, exact=True))
self.slots["weights"] = accel.IOSlot(weights_dims, np.complex64)
self.slots["delays"] = accel.IOSlot(weights_dims, np.float32)
if self.template._dither_enabled:
self.slots["rand_states"] = accel.IOSlot(
(
accel.Dimension(n_batches, exact=True),
accel.Dimension(n_channels_per_substream, exact=True),
accel.Dimension(n_spectra_per_batch, exact=True),
),
RAND_STATE_DTYPE,
)
rand_states = builder.make_states(
command_queue,
(n_batches, n_channels_per_substream, n_spectra_per_batch),
seed=seed,
sequence_first=sequence_first,
sequence_step=sequence_step,
)
self.bind(rand_states=rand_states)
def _run(self) -> None:
in_buffer = self.buffer("in")
out_buffer = self.buffer("out")
self.command_queue.enqueue_kernel(
self.template.kernel,
[
out_buffer.buffer,
self.buffer("saturated").buffer,
in_buffer.buffer,
self.buffer("weights").buffer,
self.buffer("delays").buffer,
]
+ ([self.buffer("rand_states").buffer] if self.template._dither_enabled else [])
+ [
np.int32(out_buffer.padded_shape[3]),
np.int32(out_buffer.padded_shape[2] * out_buffer.padded_shape[3]),
np.int32(out_buffer.padded_shape[1] * out_buffer.padded_shape[2] * out_buffer.padded_shape[3]),
np.int32(in_buffer.padded_shape[3]),
np.int32(in_buffer.padded_shape[2] * in_buffer.padded_shape[3]),
np.int32(in_buffer.padded_shape[1] * in_buffer.padded_shape[2] * in_buffer.padded_shape[3]),
np.int32(in_buffer.shape[1]),
np.int32(in_buffer.shape[3]),
],
global_size=(
accel.roundup(in_buffer.shape[3], self.template.block_spectra),
in_buffer.shape[2],
in_buffer.shape[0],
),
local_size=(self.template.block_spectra, 1, 1),
)