Source code for ska_mid_cbf_mcs.testing.param_gen.parameter_generation

# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.

from __future__ import annotations

import copy
import math
import random
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List

import orjson
from ska_control_model import ObsMode

from ska_mid_cbf_mcs.commons.global_enum import (
    const,
    freq_band_dict,
    supported_values,
)

# Default values for generation
PORT_START = 14000
OUTPUT_HOST = "0.0.0.0"
START_CHANNEL_ID = 0
NUM_TB = 1
PORTS_PER_HOST = 50000


[docs] def scan_config_gen(template_path: Path) -> Dict[str, Any]: """Generate Scan input from template.""" with template_path.open("rb") as opened_file: scan_dict = orjson.loads(opened_file.read()) scan_dict["scan_id"] = int(datetime.now().timestamp()) return scan_dict
[docs] def delay_model_gen( template_path: Path, subarray_id: int, dish_ids: List[str] ) -> Dict[str, Any]: """ Generate delay model from template. :param subarray_id: ID of destination subarray :param dish_ids: list of DISH IDs to generate coefficients for """ with template_path.open("rb") as opened_file: delay_model_dict = orjson.loads(opened_file.read()) delay_model_dict["subarray"] = subarray_id delay_model_dict["start_validity_sec"] = datetime.now().timestamp() delay_model_dict["receptor_delays"] = [] for dish_id in dish_ids: # Coefficient/offset values are unused in MCS, they can be random receptor_delay = { "receptor": dish_id, "xypol_coeffs_ns": [random.random() for _ in range(6)], "ypol_offset_ns": random.random(), } delay_model_dict["receptor_delays"].append(receptor_delay) return delay_model_dict
def _calculate_end_freq(start_freq: int, num_fsps_available: int) -> int: """ Calculate processing region end frequency. :param start_freq: processing region start frequency :param num_fsps_available: number of available FSPs for processing region """ coarse_channel_low = math.floor( (start_freq + const.HALF_FS_BW) / const.FS_BW ) coarse_channel_high = coarse_channel_low + num_fsps_available - 0.01 end_freq = int(coarse_channel_high * const.FS_BW) - const.HALF_FS_BW return end_freq def _calculate_channel_count(start_freq: int, end_freq: int) -> int: """ Calculate the expected channel count to process a given frequency range :param start_freq: Requested started frequency for visibilities :param end_freq: Requested started frequency for visibilities """ channel_count = ( ( (end_freq - const.FINE_CHANNEL_WIDTH - start_freq) // const.FINE_CHANNEL_WIDTH ) // const.NUM_CHANNELS_PER_SPEAD_STREAM ) * const.NUM_CHANNELS_PER_SPEAD_STREAM return channel_count def _gen_output_hosts( ch_start: int, host_start: int, num_hosts: int, ) -> List[List[int, str]]: output_host = [] for host_num in range(host_start, host_start + num_hosts, 1): channel = ch_start + (PORTS_PER_HOST * host_num) host = OUTPUT_HOST[:-1] + str(host_num) output_host.append([channel, host]) return output_host def _gen_output_ports( ch_start: int, port_start: int, host_start: int, num_hosts: int, ) -> List[List[int]]: """ Generate list of destination output ports :param num_ports: number of ports :param ch_start: first fine channel ID :param port_start: first port """ output_port = [] for host_num in range(host_start, host_start + num_hosts, 1): channel = ch_start + (PORTS_PER_HOST * host_num) output_port.append([channel, port_start, 1]) return output_port def _corr_config_gen( freq_band: str, fsp_ids: set, ) -> Dict[str, Any]: """ Generate correlation processing regions. """ num_fsp = len(fsp_ids) num_fs = freq_band_dict[freq_band]["num_frequency_slices"] num_pr = (num_fsp // num_fs) + 1 correlation = {"processing_regions": []} port_start = PORT_START sdp_start_channel_id = START_CHANNEL_ID for _ in range(num_pr): pr_config = {} # Just fill out each PR with the max number of FSPs pr_fsp_ids = [] for _ in range(num_fs): try: pr_fsp_ids.append(fsp_ids.pop()) except KeyError: break if len(pr_fsp_ids) == 0: break pr_config["fsp_ids"] = pr_fsp_ids num_fsp_pr = len(pr_fsp_ids) # Just set start_freq to beginning of band start_freq = ( freq_band_dict[freq_band]["range_hz"][0] + const.FINE_CHANNEL_WIDTH // 2 ) end_freq = min( _calculate_end_freq(start_freq, num_fsp_pr), freq_band_dict[freq_band]["range_hz"][1], ) channel_count = _calculate_channel_count(start_freq, end_freq) pr_config["start_freq"] = start_freq pr_config["channel_count"] = channel_count pr_config["channel_width"] = const.FINE_CHANNEL_WIDTH pr_config["integration_factor"] = random.randint(1, 10) # Output mapping pr_config["sdp_start_channel_id"] = sdp_start_channel_id num_ports = channel_count // const.NUM_CHANNELS_PER_SPEAD_STREAM host_start = 0 num_hosts = max(num_ports // PORTS_PER_HOST, 1) pr_config["output_host"] = _gen_output_hosts( sdp_start_channel_id, host_start, num_hosts, ) pr_config["output_link_map"] = [[sdp_start_channel_id, 1]] pr_config["output_port"] = _gen_output_ports( sdp_start_channel_id, port_start, host_start, num_hosts ) # send the next processing region to the next host host_start += num_hosts sdp_start_channel_id += channel_count # When unspecified, all receptors assigned to the subarray are used. correlation["processing_regions"].append(pr_config) return correlation def _pst_config_gen( freq_band: str, fsp_ids: set, dish_ids: List[str], ) -> Dict[str, Any]: """ Generate PST processing regions. """ # TODO: untested, probably needs to be fixed for correct PST config gen num_fsp = len(fsp_ids) num_fs = freq_band_dict[freq_band]["num_frequency_slices"] num_pr = (num_fsp // num_fs) + 1 pst_bf = {"processing_regions": []} port_start = PORT_START pst_start_channel_id = START_CHANNEL_ID dish_ids_unused = set(dish_ids) for pr_index in range(num_pr): pr_config = {} # Just fill out each PR with the max number of FSPs pr_fsp_ids = [] for _ in range(num_fs): try: pr_fsp_ids.append(fsp_ids.pop()) except KeyError: break if len(pr_fsp_ids) == 0: break pr_config["fsp_ids"] = pr_fsp_ids num_fsp_pr = len(pr_fsp_ids) # Just set start_freq to beginning of band start_freq = freq_band_dict[freq_band]["range_hz"][0] end_freq = _calculate_end_freq(start_freq, num_fsp_pr) channel_count = _calculate_channel_count(start_freq, end_freq) # TODO: 3700 fixed for AA1? pr_config["channel_count"] = channel_count pr_config["pst_start_channel_id"] = pst_start_channel_id # Timing beams # TODO only 1 beam in AA1? pr_config["timing_beams"] = [] for timing_beam_id in range(1, NUM_TB + 1): tb_config = {} tb_config["timing_beam_id"] = timing_beam_id # Randomly select a subset of receptors to use in this PR, or do not specify, # but make sure by the end that all DISH IDs are used at least once. # When unspecified, all receptors assigned to the subarray are used. tb_config["receptors"] = [] receptors = random.sample( dish_ids, random.randint(1, len(dish_ids) if len(dish_ids) > 1 else 1), ) if random.choice([True, False]): tb_config["receptors"] = sorted(receptors) dish_ids_unused.difference_update(receptors) else: del tb_config["receptors"] dish_ids_unused = set() if ( pr_index + 1 == num_pr and timing_beam_id == NUM_TB and len(dish_ids_unused) != 0 ): del pr_config["receptors"] tb_config["output_link_map"] = [[pst_start_channel_id, 1]] tb_config["output_host"] = [[pst_start_channel_id, OUTPUT_HOST]] # PST outputs all channels to the same port tb_config["output_port"] = [[pst_start_channel_id, port_start]] pr_config["timing_beams"].append(tb_config) # send the next processing region to a different port port_start += 500 pst_start_channel_id += channel_count # Update start_freq pst_bf.append(pr_config) return pst_bf
[docs] def configure_scan_config_gen( template_path: Path, subarray_id: int, frequency_band: str, fsp_modes: Dict[int, ObsMode], dish_ids: List[str], ) -> Dict[str, Any]: """ Generates the configure scan configuration as a dict. :param template_path: path to configure scan JSON template :param subarray_id: subarray ID to use for configuration :param frequency_band: frequency band to use for configuration :param fsp_modes: map of FSP IDs and ObsMode values to use for configuration :param dish_ids: list of DISH IDs to use for configuration """ with template_path.open("rb") as opened_file: template = orjson.loads(opened_file.read()) cfg = copy.deepcopy( template["10.0"] ) # TODO: update as telmodel version increases # "common" if frequency_band not in supported_values["frequency_band"]: raise Exception( f"Frequency band {frequency_band} not currently supported." ) cfg["common"]["frequency_band"] = frequency_band cfg["common"]["subarray_id"] = subarray_id cfg["common"][ "config_id" ] = f"configure_scan_{time.time()}_{len(fsp_modes)}_FSP_band_{frequency_band}" # "midcbf" fsp_corr = set() fsp_pst = set() for fsp_id, obs_mode in fsp_modes.items(): if obs_mode == ObsMode.IMAGING: fsp_corr.add(fsp_id) if obs_mode == ObsMode.PULSAR_TIMING: fsp_pst.add(fsp_id) cfg["midcbf"][ "delay_model_subscription_point" ] = f"ska_mid/tm_leaf_node/csp_subarray_{subarray_id:02}/delayModel" # "correlation" if len(fsp_corr) == 0: del cfg["midcbf"]["correlation"] else: cfg["midcbf"]["correlation"] = _corr_config_gen( freq_band=frequency_band, fsp_ids=fsp_corr ) # "pst_bf" if len(fsp_pst) == 0: del cfg["midcbf"]["pst_bf"] else: cfg["midcbf"]["pst_bf"] = _pst_config_gen( freq_band=frequency_band, fsp_ids=fsp_pst, dish_ids=dish_ids, ) return cfg
[docs] def assign_or_release_resources_gen( template_path: Path, dish_ids: List[str], ) -> Dict[str, Any]: """ generates the dictionary for AssignResources or ReleaseResources because the two commands have the same structure (just different interface names). """ with template_path.open("rb") as opened_file: assign_or_release_resources_dict = orjson.loads(opened_file.read()) assign_or_release_resources_dict["receptor_ids"] = dish_ids return assign_or_release_resources_dict