# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.
from __future__ import annotations
import copy
import math
import random
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List
import orjson
from ska_control_model import ObsMode
from ska_mid_cbf_mcs.commons.global_enum import (
const,
freq_band_dict,
supported_values,
)
# Default values for generation
PORT_START = 14000
OUTPUT_HOST = "0.0.0.0"
START_CHANNEL_ID = 0
NUM_TB = 1
PORTS_PER_HOST = 50000
[docs]
def scan_config_gen(template_path: Path) -> Dict[str, Any]:
"""Generate Scan input from template."""
with template_path.open("rb") as opened_file:
scan_dict = orjson.loads(opened_file.read())
scan_dict["scan_id"] = int(datetime.now().timestamp())
return scan_dict
[docs]
def delay_model_gen(
template_path: Path, subarray_id: int, dish_ids: List[str]
) -> Dict[str, Any]:
"""
Generate delay model from template.
:param subarray_id: ID of destination subarray
:param dish_ids: list of DISH IDs to generate coefficients for
"""
with template_path.open("rb") as opened_file:
delay_model_dict = orjson.loads(opened_file.read())
delay_model_dict["subarray"] = subarray_id
delay_model_dict["start_validity_sec"] = datetime.now().timestamp()
delay_model_dict["receptor_delays"] = []
for dish_id in dish_ids:
# Coefficient/offset values are unused in MCS, they can be random
receptor_delay = {
"receptor": dish_id,
"xypol_coeffs_ns": [random.random() for _ in range(6)],
"ypol_offset_ns": random.random(),
}
delay_model_dict["receptor_delays"].append(receptor_delay)
return delay_model_dict
def _calculate_end_freq(start_freq: int, num_fsps_available: int) -> int:
"""
Calculate processing region end frequency.
:param start_freq: processing region start frequency
:param num_fsps_available: number of available FSPs for processing region
"""
coarse_channel_low = math.floor(
(start_freq + const.HALF_FS_BW) / const.FS_BW
)
coarse_channel_high = coarse_channel_low + num_fsps_available - 0.01
end_freq = int(coarse_channel_high * const.FS_BW) - const.HALF_FS_BW
return end_freq
def _calculate_channel_count(start_freq: int, end_freq: int) -> int:
"""
Calculate the expected channel count to process a given frequency range
:param start_freq: Requested started frequency for visibilities
:param end_freq: Requested started frequency for visibilities
"""
channel_count = (
(
(end_freq - const.FINE_CHANNEL_WIDTH - start_freq)
// const.FINE_CHANNEL_WIDTH
)
// const.NUM_CHANNELS_PER_SPEAD_STREAM
) * const.NUM_CHANNELS_PER_SPEAD_STREAM
return channel_count
def _gen_output_hosts(
ch_start: int,
host_start: int,
num_hosts: int,
) -> List[List[int, str]]:
output_host = []
for host_num in range(host_start, host_start + num_hosts, 1):
channel = ch_start + (PORTS_PER_HOST * host_num)
host = OUTPUT_HOST[:-1] + str(host_num)
output_host.append([channel, host])
return output_host
def _gen_output_ports(
ch_start: int,
port_start: int,
host_start: int,
num_hosts: int,
) -> List[List[int]]:
"""
Generate list of destination output ports
:param num_ports: number of ports
:param ch_start: first fine channel ID
:param port_start: first port
"""
output_port = []
for host_num in range(host_start, host_start + num_hosts, 1):
channel = ch_start + (PORTS_PER_HOST * host_num)
output_port.append([channel, port_start, 1])
return output_port
def _corr_config_gen(
freq_band: str,
fsp_ids: set,
) -> Dict[str, Any]:
"""
Generate correlation processing regions.
"""
num_fsp = len(fsp_ids)
num_fs = freq_band_dict[freq_band]["num_frequency_slices"]
num_pr = (num_fsp // num_fs) + 1
correlation = {"processing_regions": []}
port_start = PORT_START
sdp_start_channel_id = START_CHANNEL_ID
for _ in range(num_pr):
pr_config = {}
# Just fill out each PR with the max number of FSPs
pr_fsp_ids = []
for _ in range(num_fs):
try:
pr_fsp_ids.append(fsp_ids.pop())
except KeyError:
break
if len(pr_fsp_ids) == 0:
break
pr_config["fsp_ids"] = pr_fsp_ids
num_fsp_pr = len(pr_fsp_ids)
# Just set start_freq to beginning of band
start_freq = (
freq_band_dict[freq_band]["range_hz"][0]
+ const.FINE_CHANNEL_WIDTH // 2
)
end_freq = min(
_calculate_end_freq(start_freq, num_fsp_pr),
freq_band_dict[freq_band]["range_hz"][1],
)
channel_count = _calculate_channel_count(start_freq, end_freq)
pr_config["start_freq"] = start_freq
pr_config["channel_count"] = channel_count
pr_config["channel_width"] = const.FINE_CHANNEL_WIDTH
pr_config["integration_factor"] = random.randint(1, 10)
# Output mapping
pr_config["sdp_start_channel_id"] = sdp_start_channel_id
num_ports = channel_count // const.NUM_CHANNELS_PER_SPEAD_STREAM
host_start = 0
num_hosts = max(num_ports // PORTS_PER_HOST, 1)
pr_config["output_host"] = _gen_output_hosts(
sdp_start_channel_id,
host_start,
num_hosts,
)
pr_config["output_link_map"] = [[sdp_start_channel_id, 1]]
pr_config["output_port"] = _gen_output_ports(
sdp_start_channel_id, port_start, host_start, num_hosts
)
# send the next processing region to the next host
host_start += num_hosts
sdp_start_channel_id += channel_count
# When unspecified, all receptors assigned to the subarray are used.
correlation["processing_regions"].append(pr_config)
return correlation
def _pst_config_gen(
freq_band: str,
fsp_ids: set,
dish_ids: List[str],
) -> Dict[str, Any]:
"""
Generate PST processing regions.
"""
# TODO: untested, probably needs to be fixed for correct PST config gen
num_fsp = len(fsp_ids)
num_fs = freq_band_dict[freq_band]["num_frequency_slices"]
num_pr = (num_fsp // num_fs) + 1
pst_bf = {"processing_regions": []}
port_start = PORT_START
pst_start_channel_id = START_CHANNEL_ID
dish_ids_unused = set(dish_ids)
for pr_index in range(num_pr):
pr_config = {}
# Just fill out each PR with the max number of FSPs
pr_fsp_ids = []
for _ in range(num_fs):
try:
pr_fsp_ids.append(fsp_ids.pop())
except KeyError:
break
if len(pr_fsp_ids) == 0:
break
pr_config["fsp_ids"] = pr_fsp_ids
num_fsp_pr = len(pr_fsp_ids)
# Just set start_freq to beginning of band
start_freq = freq_band_dict[freq_band]["range_hz"][0]
end_freq = _calculate_end_freq(start_freq, num_fsp_pr)
channel_count = _calculate_channel_count(start_freq, end_freq)
# TODO: 3700 fixed for AA1?
pr_config["channel_count"] = channel_count
pr_config["pst_start_channel_id"] = pst_start_channel_id
# Timing beams
# TODO only 1 beam in AA1?
pr_config["timing_beams"] = []
for timing_beam_id in range(1, NUM_TB + 1):
tb_config = {}
tb_config["timing_beam_id"] = timing_beam_id
# Randomly select a subset of receptors to use in this PR, or do not specify,
# but make sure by the end that all DISH IDs are used at least once.
# When unspecified, all receptors assigned to the subarray are used.
tb_config["receptors"] = []
receptors = random.sample(
dish_ids,
random.randint(1, len(dish_ids) if len(dish_ids) > 1 else 1),
)
if random.choice([True, False]):
tb_config["receptors"] = sorted(receptors)
dish_ids_unused.difference_update(receptors)
else:
del tb_config["receptors"]
dish_ids_unused = set()
if (
pr_index + 1 == num_pr
and timing_beam_id == NUM_TB
and len(dish_ids_unused) != 0
):
del pr_config["receptors"]
tb_config["output_link_map"] = [[pst_start_channel_id, 1]]
tb_config["output_host"] = [[pst_start_channel_id, OUTPUT_HOST]]
# PST outputs all channels to the same port
tb_config["output_port"] = [[pst_start_channel_id, port_start]]
pr_config["timing_beams"].append(tb_config)
# send the next processing region to a different port
port_start += 500
pst_start_channel_id += channel_count
# Update start_freq
pst_bf.append(pr_config)
return pst_bf
[docs]
def assign_or_release_resources_gen(
template_path: Path,
dish_ids: List[str],
) -> Dict[str, Any]:
"""
generates the dictionary for AssignResources or ReleaseResources because
the two commands have the same structure (just different interface names).
"""
with template_path.open("rb") as opened_file:
assign_or_release_resources_dict = orjson.loads(opened_file.read())
assign_or_release_resources_dict["receptor_ids"] = dish_ids
return assign_or_release_resources_dict