# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.
from __future__ import annotations
import os
from collections.abc import Iterable
from itertools import chain
from queue import Full, Queue
from threading import Event, Lock, Thread
from typing import Any, Callable, Dict, Generator, List, Optional
import orjson
import tango
from ska_control_model import (
AdminMode,
CommunicationStatus,
ObsMode,
ObsState,
PowerState,
ResultCode,
TaskStatus,
)
from ska_mid_cbf_common.events.event_handler import EventHandler
from ska_schemas.schema import validate as schema_validate
from ska_tango_testing import context
from ska_mid_cbf_mcs.base.obs.obs_component_manager import (
CbfObsComponentManager,
)
from ska_mid_cbf_mcs.commons.dish_utils import DISHUtils
from ska_mid_cbf_mcs.commons.global_enum import (
const,
freq_band_dict,
obs_mode_key_to_name,
param_keys,
supported_values,
)
from ska_mid_cbf_mcs.commons.validate_interface import (
CBF_ABORT_VER1_0,
CBF_GOTOIDLE_VER1_0,
CBF_OBSRESET_VER1_0,
CBF_RESTART_VER1_0,
validate_interface,
)
from ska_mid_cbf_mcs.commons.vcc_unit_utils import VCCUnitUtil
from ska_mid_cbf_mcs.subarray.fsp_configuration.generator import fsp_config_gen
from ska_mid_cbf_mcs.subarray.scan_configuration_validator.validator import (
SubarrayScanConfigurationValidator,
)
from ska_mid_cbf_mcs.subarray.vcc_all_bands_configuration.generator import (
vcc_config_gen,
)
__all__ = ["CbfSubarrayComponentManager"]
COMMAND_ASSIGN_RESOURCES = "AssignResources"
COMMAND_RELEASE_RESOURCES = "ReleaseResources"
COMMAND_RELEASE_ALL_RESOURCES = "ReleaseAllResources"
COMMAND_RESTART = "Restart"
[docs]
class CbfSubarrayComponentManager(CbfObsComponentManager):
"""
A component manager for the CbfSubarray device.
"""
def __init__(
self: CbfSubarrayComponentManager,
*args: Any,
subarray_id: int,
controller: str,
vcc_all_bands: List[str],
fsp: List[str],
fsp_corr_sub: List[str],
**kwargs: Any,
) -> None:
"""
Initialise a new instance.
:param subarray_id: ID of subarray
:param controller: FQDN of controller device
:param vcc_all_bands: FQDNs of subordinate VCCAllBands devices
:param fsp: FQDNs of subordinate FSP devices
:param fsp_corr_sub: FQDNs of subordinate FSP CORR subarray devices
"""
super().__init__(*args, **kwargs)
# Configuration management
self._assigned_dish_ids = set()
self._subarray_id = subarray_id
self._dish_utils = None
self._vcc_unit_util = None
# Device FQDNs
self._fqdn_controller = controller
self._fqdn_vcc_all_bands = vcc_all_bands
self._fqdn_fsp = fsp
self._fqdn_fsp_corr = fsp_corr_sub
# Delay model management
# TODO: maxsize could be just 1 or 2?
self._delay_model_queue = Queue(maxsize=64)
self._delay_model_thread: Thread = None
self._fsp_subarray_scan_start_time_events = {}
self._scan_start_time_rounded_queue = Queue(
maxsize=const.MAX_FSP * const.MAX_SUBARRAY
)
self._scan_start_time_rounded_thread = None
# for easy device-reference
self._rfi_flagging_mask = {}
self._frequency_band_offset_stream1 = 0
self._frequency_band_offset_stream2 = 0
self._stream_tuning = [0, 0]
# DeviceProxy management
self._assigned_vcc_ids = set()
self._configured_vcc_ids = set()
self._assigned_fsp_ids = set()
self._configured_fsp_mode_ids = {
ObsMode.IMAGING.name: set(),
ObsMode.PULSAR_TIMING.name: set(),
ObsMode.PULSAR_SEARCH.name: set(),
}
# Mutex Lock for _scan_started, to sync FSP Output Time
self._scan_started_lock = Lock()
self._scan_started = False
# ------------------
# Mutex Lock Setters
# ------------------
def _update_scan_started(self: CbfSubarrayComponentManager, value: bool):
"""
Sets the _scan_started flag while invoking a mutex lock
:param value: The boolean value to set _scan_started
"""
with self._scan_started_lock:
self._scan_started = value
def _read_scan_started(self: CbfSubarrayComponentManager) -> bool:
"""
Reads the _scan_started flag while invoking a mutex lock
:param value: The boolean value to set _scan_started
"""
with self._scan_started_lock:
return self._scan_started
# -------------
# Communication
# -------------
def _init_proxies(self: CbfSubarrayComponentManager) -> bool:
"""
Initialize proxies to FSP and VCC subelements
:return: True if proxy initialization succeed, otherwise False
"""
self.proxy_dict = {
const.CONTROLLER: None,
const.VCC_ALL_BANDS: {},
const.FSP: {},
ObsMode.IMAGING.name: {},
ObsMode.PULSAR_TIMING.name: {},
ObsMode.PULSAR_SEARCH.name: {},
}
try:
self.proxy_dict[const.CONTROLLER] = context.DeviceProxy(
device_name=self._fqdn_controller,
)
for vcc_id, fqdn in enumerate(self._fqdn_vcc_all_bands, 1):
self.proxy_dict[const.VCC_ALL_BANDS][vcc_id] = (
context.DeviceProxy(
device_name=fqdn,
)
)
for fsp_id, (fsp_fqdn, fsp_corr_fqdn) in enumerate(
zip(self._fqdn_fsp, self._fqdn_fsp_corr), 1
):
self.proxy_dict[const.FSP][fsp_id] = context.DeviceProxy(
device_name=fsp_fqdn
)
self.proxy_dict[ObsMode.IMAGING.name][fsp_id] = (
context.DeviceProxy(device_name=fsp_corr_fqdn)
)
except tango.DevFailed as dev_failed:
self.logger.error(f"{dev_failed}")
return False
return True
def _get_vcc_unit_config(self: CbfSubarrayComponentManager) -> bool:
"""
Retrieve vcc unit configuration data from the CBF Controller
"""
try:
# get_property returns a dict of requested properties
vcc_id_to_vcc_unit_id_mapping = self.proxy_dict[
const.CONTROLLER
].get_property("VCCIDToVCCUnitIDMapping")[
"VCCIDToVCCUnitIDMapping"
]
except tango.DevFailed as dev_failed:
self.logger.error(
f"failed to get VCC ID to VCC Unit ID mapping from the controller; {dev_failed}"
)
return False
self._vcc_unit_util = VCCUnitUtil(vcc_id_to_vcc_unit_id_mapping)
return True
[docs]
def start_communicating(
self: CbfSubarrayComponentManager,
*args: Any,
admin_mode: AdminMode,
**kwargs: Any,
) -> None:
"""
Thread for start_communicating operation.
"""
success = self._init_proxies()
if not success:
self.logger.error("Failed to initialize proxies.")
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
return
success = self._get_vcc_unit_config()
if not success:
self.logger.error("Failed to retrieve vcc unit configuration")
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
return
super().start_communicating(admin_mode=admin_mode)
self._update_component_state(power=PowerState.ON)
# --------------------
# Delay Model Handling
# --------------------
[docs]
def delay_model_event_callback(
self: CbfSubarrayComponentManager, event_data: tango.EventData
) -> None:
""" "
Callback for delayModel change event subscription.
:param event_data: the received change event data
"""
# NOTE (CIP-3559): event callbacks must be as short as possible
# Here we simply queue event data and handle possible exceptions
try:
self._delay_model_queue.put_nowait(event_data)
except Full:
# Start dropping data if queue is full
self._delay_model_queue.get_nowait()
self._delay_model_queue.task_done()
self._delay_model_queue.put(event_data)
def _update_delay_model(self: CbfSubarrayComponentManager) -> None:
"""
Update FSP delay models.
This method is always started in a separate thread.
"""
# Indicate that this thread performs Tango operations
with tango.EnsureOmniThread():
self.logger.info("Starting delay model processing thread.")
while True:
event_data = self._delay_model_queue.get()
self._delay_model_queue.task_done()
if event_data is EventHandler.EXIT_CONDITION:
# This ends the thread
break
# No need to check event data validity as this is done in EventTracer,
# just check if subarray is in the proper state to ingest the data.
model = event_data.attr_value.value
current_model = str(self._device.delay_model_signal)
if (
not self.is_communicating
or model == ""
or model == current_model
or self._device.obs_state_signal
not in [
ObsState.CONFIGURING,
ObsState.READY,
ObsState.SCANNING,
]
):
self.logger.warning(f"Ignoring delay data; {model}")
continue
# Validate delay model against telmodel schema
try:
delay_model_json = orjson.loads(model)
self.logger.debug(
f"Attempting to validate the following delay model JSON against the telescope model: {delay_model_json}"
)
schema_validate(
version=delay_model_json["interface"],
config=delay_model_json,
strictness=1,
)
self.logger.debug("Delay model is valid!")
except (orjson.JSONDecodeError, ValueError) as error:
self.logger.error(
f"Delay model JSON validation against the telescope model schema failed, ignoring delay model; {error}."
)
continue
# Validate start_validity_sec
current_start_validity_sec = orjson.loads(current_model)[
"start_validity_sec"
]
new_start_validity_sec = delay_model_json["start_validity_sec"]
if new_start_validity_sec < current_start_validity_sec:
self.logger.warning(
f"Ignoring delay model; start_validity_sec ({new_start_validity_sec}) is earlier than current delay model ({current_start_validity_sec})."
)
continue
# Validate DISH IDs, then convert them to VCC ID integers for FSPs
valid_dish_ids = True
for delay_detail in delay_model_json["receptor_delays"]:
dish_id = delay_detail["receptor"]
if dish_id not in self._assigned_dish_ids:
self.logger.warning(
f"Delay model contains data for DISH ID {dish_id} not belonging to this subarray, ignoring delay model."
)
valid_dish_ids = False
break
delay_detail["receptor"] = (
self._dish_utils.dish_id_to_vcc_id[dish_id]
)
# If valid, issue delay model to assigned resources
if valid_dish_ids:
self.logger.info(
f"Updating delay model; {delay_model_json}"
)
# We don't check result codes as we simply log a warning in
# issue_group_command and then continue if there is a failure
# in updating a delay model
delay_model_arg = orjson.dumps(delay_model_json).decode()
self.issue_group_command(
command_name="UpdateDelayModel",
command_args=delay_model_arg,
proxies=self.get_configured_fsp_mode_proxies(
ObsMode.IMAGING
),
is_lrc=False, # UpdateDelayModel is a fast command
check_on_fail={
"delayModel": lambda attr_val: attr_val
== delay_model_arg
},
)
# Update delayModel attribute signal
self._device.delay_model_signal = model
# -------------------
# Resourcing Commands
# -------------------
@property
def assigned_fsp_proxies(
self: CbfSubarrayComponentManager,
) -> Generator[tango.DeviceProxy]:
"""Return an iterable containing all assigned Fsp device proxies."""
return (
self.proxy_dict[const.FSP][fsp_id]
for fsp_id in self._assigned_fsp_ids
)
@property
def configured_fsp_mode_proxies(
self: CbfSubarrayComponentManager,
) -> Generator[tango.DeviceProxy]:
"""Return an iterable containing all configured FSP ObsMode subarray device proxies."""
return chain(
self.get_configured_fsp_mode_proxies(ObsMode.IMAGING),
self.get_configured_fsp_mode_proxies(ObsMode.PULSAR_TIMING),
self.get_configured_fsp_mode_proxies(ObsMode.PULSAR_SEARCH),
)
@property
def configured_vcc_proxies(
self: CbfSubarrayComponentManager,
) -> Generator[tango.DeviceProxy]:
"""Return a list of proxies to all currently configured VCCAllBandsController devices."""
return (
self.proxy_dict[const.VCC_ALL_BANDS][vcc_id]
for vcc_id in self._configured_vcc_ids
)
@property
def configured_obs_proxies(
self: CbfSubarrayComponentManager,
) -> Generator[tango.DeviceProxy]:
"""Return a list of proxies to all currently assigned observing devices."""
return chain(
self.configured_vcc_proxies, self.configured_fsp_mode_proxies
)
# --- AssignResources Command --- #
def _fqdn_contains_vcc_id(
self: CbfSubarrayComponentManager, fqdn: str, vcc_id: int
):
return vcc_id == int(fqdn.split("/")[2])
[docs]
def assign_vcc(
self: CbfSubarrayComponentManager,
params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Assign resources to subarray.
:param params: The JSON object containing a list of
DISH (receptor) IDs to be assigned
:param task_callback: callback for driving status of task executor's
current LRC task
:param task_abort_event: event indicating Abort has been issued
"""
# set task status in progress, check for abort event
task_callback(status=TaskStatus.IN_PROGRESS)
if self.task_abort_event_is_set(
COMMAND_ASSIGN_RESOURCES, task_callback, task_abort_event
):
return
# Validate Interface
valid, param_dict = self._handle_command_validation(
COMMAND_ASSIGN_RESOURCES, params, task_callback
)
if not valid:
return
self._set_transaction_id(param_dict)
self._log_transaction_id(COMMAND_ASSIGN_RESOURCES, param_dict)
if self._dish_utils is None:
try:
self._dish_utils = DISHUtils(
orjson.loads(self._device.sys_param_signal)
)
except (KeyError, orjson.JSONDecodeError) as exc:
self.logger.error(
f"Failed to update DISH ID to VCC ID and frequency offset k mapping, "
f"current system parameters: {self._device.sys_param_signal}; {exc}"
)
task_callback(
status=TaskStatus.COMPLETED,
result=(
ResultCode.FAILED,
"Failed to initialize system parameters.",
),
)
return
# Build a dict of VCCs to assign.
vcc_to_assign = {}
for dish_id in param_dict[param_keys.RECEPTOR_IDS]:
self.logger.debug(f"Attempting to assign DISH ID {dish_id}")
try:
if dish_id in self._assigned_dish_ids:
raise KeyError("already assigned to this subarray.")
vcc_id = self._dish_utils.dish_id_to_vcc_id[dish_id]
if 0 >= vcc_id > len(self._fqdn_vcc_all_bands):
raise KeyError(
f"VCC {vcc_id} not in current capabilities."
)
# Select VCC proxy to assign, subscribe to LRC results
vcc_proxy = self.proxy_dict[const.VCC_ALL_BANDS][vcc_id]
vcc_to_assign[vcc_proxy.dev_name()] = {
"dish_id": dish_id,
"vcc_id": vcc_id,
}
self._event_handler.subscribe_events(
proxy=vcc_proxy,
change_event_callbacks={
"longRunningCommandResult": self.results_callback,
},
)
except (KeyError, tango.DevFailed) as error:
self.logger.warning(
f"Failed to assign DISH ID {dish_id}; {error}"
)
continue
# First check to see if any valid data added to vcc_to_assign.
if len(vcc_to_assign) == 0:
task_callback(
status=TaskStatus.COMPLETED,
result=(
ResultCode.FAILED,
"Failed to assign any DISH IDs.",
),
)
return
# UpdateSubarrayMembership allowed to partially succeed to attempt
# resource assignment on best-effort basis.
command_results = self.issue_group_command(
command_name="UpdateSubarrayMembership",
command_args=self._subarray_id,
proxies=(
self.proxy_dict[const.VCC_ALL_BANDS][vcc["vcc_id"]]
for vcc in vcc_to_assign.values()
),
partial_success=True,
check_on_fail={
"subarrayID": lambda attr_val: attr_val == self._subarray_id,
"obsState": lambda attr_val: attr_val == ObsState.IDLE,
},
)
for dev_name, result_code in command_results.items():
if result_code != ResultCode.OK:
vcc = vcc_to_assign.pop(dev_name)
self._event_handler.unsubscribe_events(
proxy=self.proxy_dict[const.VCC_ALL_BANDS][vcc["vcc_id"]]
)
self.logger.warning(
f"Failed to assign DISH ID {vcc['dish_id']}"
)
# Second check to see if any values in vcc_to_assign remain after
# checking command results.
if len(vcc_to_assign) == 0:
task_callback(
status=TaskStatus.COMPLETED,
result=(
ResultCode.FAILED,
"Failed to assign any DISH IDs.",
),
)
return
# If there are VCCs to assign, Update obsState machine via callback if previously
# unresourced.
if len(self._assigned_dish_ids) == 0:
self._update_component_state(resourced=True)
# Update instance attributes and device attribute signals
dish_ids = sorted(vcc["dish_id"] for vcc in vcc_to_assign.values())
self._assigned_dish_ids.update(dish_ids)
self._device.dish_ids_signal = sorted(self._assigned_dish_ids)
self._assigned_vcc_ids.update(
vcc["vcc_id"] for vcc in vcc_to_assign.values()
)
self._device.vcc_ids_signal = sorted(self._assigned_vcc_ids)
self.logger.info(
f"DISH IDs after assignment: {self._assigned_dish_ids}"
)
task_callback(
result=(
ResultCode.OK,
(
"AssignResources completed OK; successfully assigned "
+ ", ".join(dish_ids)
),
),
status=TaskStatus.COMPLETED,
)
return
# --- ReleaseResources Command --- #
def _release_vcc_resources(
self: CbfSubarrayComponentManager, dish_ids: Iterable[str]
) -> List[str]:
"""
Main loop for use in releasing VCC resources, shared between resource-releasing
commands and Restart command
:param dish_ids: list of DISH IDs
:return: list of successfully removed DISH IDs
"""
# Build a dict of VCCs to release.
vcc_to_release = {}
for dish_id in dish_ids:
self.logger.debug(f"Attempting to remove {dish_id}")
if dish_id not in self._assigned_dish_ids:
self.logger.warning(
f"Skipping receptor {dish_id} as it is not currently assigned to this subarray."
)
continue
vcc_id = self._dish_utils.dish_id_to_vcc_id[dish_id]
vcc_to_release[
self.proxy_dict[const.VCC_ALL_BANDS][vcc_id].dev_name()
] = {
"dish_id": dish_id,
"vcc_id": vcc_id,
}
# First check to see if any valid data added to vcc_to_release.
if len(vcc_to_release) == 0:
self.logger.error("Failed to release any DISH IDs.")
return []
# UpdateSubarrayMembership allowed to partially succeed to attempt
# resource release on best-effort basis.
command_results = self.issue_group_command(
command_name="UpdateSubarrayMembership",
command_args=0,
proxies=(
self.proxy_dict[const.VCC_ALL_BANDS][vcc["vcc_id"]]
for vcc in vcc_to_release.values()
),
partial_success=True,
check_on_fail={
"subarrayID": lambda attr_val: attr_val == 0,
"obsState": lambda attr_val: attr_val == ObsState.IDLE,
},
)
for dev_name, result_code in command_results.items():
if result_code != ResultCode.OK:
vcc = vcc_to_release.pop(dev_name)
self.logger.warning(
f"Failed to release DISH ID {vcc['dish_id']}"
)
# Second check to see if any values in vcc_to_release remain after
# checking command results.
if len(vcc_to_release) == 0:
self.logger.error("Failed to release any DISH IDs.")
return []
# Unsubscribe from VCC LRC results.
for vcc in vcc_to_release.values():
self._event_handler.unsubscribe_events(
proxy=self.proxy_dict[const.VCC_ALL_BANDS][vcc["vcc_id"]]
)
# Update instance attributes and device attribute signals
dish_ids = sorted(vcc["dish_id"] for vcc in vcc_to_release.values())
self._assigned_dish_ids.difference_update(dish_ids)
self._device.dish_ids_signal = sorted(self._assigned_dish_ids)
self._assigned_vcc_ids.difference_update(
vcc["vcc_id"] for vcc in vcc_to_release.values()
)
self._device.vcc_ids_signal = sorted(self._assigned_vcc_ids)
# Update ObsState machine via callback and reset system parameters if now unresourced
if len(self._assigned_dish_ids) == 0:
self._update_component_state(resourced=False)
self._dish_utils = None
self.logger.info(f"DISH IDs after removal: {self._assigned_dish_ids}")
return dish_ids
[docs]
def release_vcc(
self: CbfSubarrayComponentManager,
params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Release dishes from subarray.
:param params: JSON string with the list of DISH
(receptor) IDs to be removed
:param task_callback: callback for driving status of task executor's
current LRC task
:param task_abort_event: event indicating Abort has been issued
"""
# set task status in progress, check for abort event
task_callback(status=TaskStatus.IN_PROGRESS)
if self.task_abort_event_is_set(
COMMAND_RELEASE_RESOURCES, task_callback, task_abort_event
):
return
# Validate Command
valid, param_dict = self._handle_command_validation(
COMMAND_RELEASE_RESOURCES, params, task_callback
)
if not valid:
return
self._set_transaction_id(param_dict)
self._log_transaction_id(COMMAND_RELEASE_RESOURCES, param_dict)
dish_ids = param_dict[param_keys.RECEPTOR_IDS]
input_dish_valid, msg = self._dish_utils.are_Valid_DISH_Ids(dish_ids)
if not input_dish_valid:
task_callback(
status=TaskStatus.COMPLETED,
result=(ResultCode.FAILED, msg),
)
return
dish_ids_released = self._release_vcc_resources(dish_ids=dish_ids)
if len(dish_ids_released) == 0:
task_callback(
status=TaskStatus.COMPLETED,
result=(
ResultCode.FAILED,
"Failed to release any DISH IDs.",
),
)
return
task_callback(
result=(
ResultCode.OK,
"ReleaseResources completed OK; successfully released "
+ ", ".join(dish_ids_released),
),
status=TaskStatus.COMPLETED,
)
return
[docs]
def release_all_vcc(
self: CbfSubarrayComponentManager,
params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Release all resources/dishes from subarray.
:param task_callback: callback for driving status of task executor's
current LRC task
:param task_abort_event: event indicating Abort has been issued
"""
# set task status in progress, check for abort event
task_callback(status=TaskStatus.IN_PROGRESS)
if self.task_abort_event_is_set(
COMMAND_RELEASE_RESOURCES, task_callback, task_abort_event
):
return
# Validate the input
valid, release_all_vcc_params_dict = self._handle_command_validation(
COMMAND_RELEASE_ALL_RESOURCES,
params,
task_callback,
)
if not valid:
return
self._set_transaction_id(release_all_vcc_params_dict)
self._log_transaction_id(
COMMAND_RELEASE_ALL_RESOURCES, release_all_vcc_params_dict
)
self._release_vcc_resources(
dish_ids=list(self._device.dish_ids_signal)
)
if len(self._device.dish_ids_signal) != 0:
task_callback(
status=TaskStatus.COMPLETED,
result=(
ResultCode.FAILED,
"Failed to remove all DISH IDs.",
),
)
return
task_callback(
result=(ResultCode.OK, "ReleaseAllResources completed OK"),
status=TaskStatus.COMPLETED,
)
return
# -------------
# Scan Commands
# -------------
# --- ConfigureScan Command --- #
def _validate_configure_scan_input(
self: CbfSubarrayComponentManager, config: str
) -> tuple[bool, dict]:
"""
Validate scan configuration JSON.
:param config: The configuration as JSON formatted string.
:return: False and None if validation failed, otherwise True and valid
decoded dict
"""
self.logger.info("Validating ConfigureScan input JSON...")
# Validate supported interface passed in the JSON string.
# If invalid, validate_interface will return a message instead of the
# param dict.
(valid, config_dict) = validate_interface(config, "configurescan")
if not valid:
self.logger.error(config_dict)
return False, None
# Validate full_configuration against the telescope model
try:
# Workaround to enable strict validation here.
# Need to set both PYTEST_VERSION to something and
# SKA_SCHEMAS_ALLOW_STRICT_VALIDATION to True
pytest_version = os.environ.get("PYTEST_VERSION")
if pytest_version is None:
os.environ["PYTEST_VERSION"] = "7.2.0"
os.environ["SKA_SCHEMAS_ALLOW_STRICT_VALIDATION"] = "True"
schema_validate(
version=config_dict["interface"],
config=config_dict,
strictness=2,
)
self.logger.info("Scan configuration is valid against telmodel!")
except ValueError as value_error:
self.logger.error(
f"ConfigureScan JSON validation against the telescope model schema failed; {value_error}."
)
return False, None
# At this point, validate FSP, VCC, subscription parameters
validate_scan_config = self.proxy_dict[
const.CONTROLLER
].validateSupportedConfiguration
# MCS Scan Configuration Validation
if validate_scan_config is True:
validator = SubarrayScanConfigurationValidator(
scan_configuration=config_dict,
dish_ids=list(self._assigned_dish_ids),
subarray_id=self._subarray_id,
logger=self.logger,
count_fsp=len(self._fqdn_fsp),
)
success, msg = validator.validate_input()
if success:
self.logger.info(msg)
else:
self.logger.error(msg)
config_dict = None
return success, config_dict
else:
self.logger.info(
"Skipping MCS supported configuration validation."
)
return True, config_dict
def _vcc_configure_scan(
self: CbfSubarrayComponentManager,
configuration: Dict[Any, Any],
) -> bool:
"""
Issue Vcc ConfigureScan command
:param configuration: scan configuration dict
:return: True if VCC ConfigureScan was successful, otherwise False
"""
self.logger.info(
"Issuing ConfigureScan command to FHS VCC all bands controllers."
)
command_results = self.issue_group_command(
command_name="ConfigureScan",
command_args=vcc_config_gen(
configuration=configuration,
vcc_ids=self._assigned_vcc_ids,
vcc_unit_utils=self._vcc_unit_util,
dish_utils=self._dish_utils,
),
proxies=(
self.proxy_dict[const.VCC_ALL_BANDS][vcc_id]
for vcc_id in self._assigned_vcc_ids
),
check_on_fail={
"obsState": lambda attr_val: attr_val == ObsState.READY,
},
)
configure_scan_failure = False
for vcc_id in self._assigned_vcc_ids:
if (
command_results[
self.proxy_dict[const.VCC_ALL_BANDS][vcc_id].dev_name()
]
== ResultCode.OK
):
self._configured_vcc_ids.add(vcc_id)
else:
configure_scan_failure = True
if configure_scan_failure:
return False
return True
def _get_obs_mode_fsp_ids(
self: CbfSubarrayComponentManager,
configuration: Dict[Any, Any],
) -> bool:
"""
Return mapping of FSP IDs to observing modes requested in ConfigureScan.
:param configuration: ConfigureScan dict
:return: dict containing observing mode key names to FSP IDs
"""
obs_mode_to_fsp_ids = {}
for key in configuration["midcbf"].keys():
if (
key in obs_mode_key_to_name
and ObsMode[obs_mode_key_to_name[key]]
in supported_values["observing_modes"]
):
obs_mode_to_fsp_ids[key] = []
for processing_region in configuration["midcbf"][key][
"processing_regions"
]:
obs_mode_to_fsp_ids[key].extend(
processing_region["fsp_ids"]
)
return obs_mode_to_fsp_ids
def _fsp_configure_scan(
self: CbfSubarrayComponentManager,
configuration: Dict[Any, Any],
) -> bool:
"""
Issue FSP function mode subarray ConfigureScan command
:param configuration: ConfigureScan dict
:return: True if successfully configured all FSP devices, otherwise False
"""
self.logger.info("Configuring FSPs for scan...")
# Configuration has been validated at this point so we don't need to worry
# about repeated FSP IDs across processing regions.
obs_mode_fsp_ids = self._get_obs_mode_fsp_ids(configuration)
if len(obs_mode_fsp_ids) == 0:
self.logger.error(
"Failed to parse configuration for valid processing regions, "
"may contain unsupported observing mode data; supported modes: "
f"{supported_values['observing_modes']}"
)
return False
for obs_mode_key, fsp_ids in obs_mode_fsp_ids.items():
obs_mode = obs_mode_key_to_name[obs_mode_key]
# TODO: remove as observing modes become supported
if obs_mode in (
ObsMode.PULSAR_TIMING.name,
ObsMode.PULSAR_SEARCH.name,
ObsMode.VLBI.name,
):
self.logger.error(f"ObsMode.{obs_mode} currently unsupported.")
continue
# --- FSP UpdateObsMode command --- #
# Subscribe to FSP device LRC change events
for fsp_id in fsp_ids:
self._event_handler.subscribe_events(
proxy=self.proxy_dict[const.FSP][fsp_id],
change_event_callbacks={
"longRunningCommandResult": self.results_callback
},
)
# Issue UpdateObsMode command to all FSPs in the same mode.
update_obs_mode_arg = orjson.dumps(
{
"subarray_id": self._subarray_id,
"obs_mode": obs_mode,
}
).decode()
command_results = self.issue_group_command(
command_name="UpdateObsMode",
command_args=update_obs_mode_arg,
proxies=(
self.proxy_dict[const.FSP][fsp_id] for fsp_id in fsp_ids
),
check_on_fail={
"subarrayMembership": lambda attr_val: self._subarray_id
in attr_val,
"obsMode": lambda attr_val: attr_val == ObsMode[obs_mode],
},
)
update_obs_mode_failure = False
for fsp_id in fsp_ids:
dev_name = self.proxy_dict[const.FSP][fsp_id].dev_name()
if command_results[dev_name] == ResultCode.OK:
self._assigned_fsp_ids.add(fsp_id)
self.logger.info(f"FSP {fsp_id} successfully assigned.")
else:
update_obs_mode_failure = True
self.logger.error(f"Failed to assign FSP {fsp_id}.")
if update_obs_mode_failure:
return False
# --- FSP ObsMode Subarray ConfigureScan command --- #
# Subscribe to FSP ObsMode Subarray device LRC change events
for fsp_id in fsp_ids:
self._event_handler.subscribe_events(
proxy=self.proxy_dict[obs_mode][fsp_id],
change_event_callbacks={
"longRunningCommandResult": self.results_callback
},
)
command_results = self.issue_group_command(
command_name="ConfigureScan",
command_args=fsp_config_gen(
obs_mode_key=obs_mode_key,
configuration=configuration,
dish_utils=self._dish_utils,
subarray_dish_ids=self._assigned_dish_ids,
wideband_shift=0,
),
proxies=(
self.proxy_dict[obs_mode][fsp_id] for fsp_id in fsp_ids
),
check_on_fail={
"obsState": lambda attr_val: attr_val == ObsState.READY,
},
)
configure_failure = False
for fsp_id in fsp_ids:
dev_name = self.proxy_dict[obs_mode][fsp_id].dev_name()
if command_results[dev_name] == ResultCode.OK:
self.logger.info(f"{dev_name} successfully configured.")
self._configured_fsp_mode_ids[obs_mode].add(fsp_id)
else:
self.logger.error(f"Failed to configure {dev_name}.")
self._event_handler.unsubscribe_events(
proxy=self.proxy_dict[obs_mode][fsp_id]
)
configure_failure = True
if configure_failure:
return False
return True
def _subscribe_tm_event(
self: CbfSubarrayComponentManager,
subscription_point: str,
callback: Callable,
) -> bool:
"""
Subscribe to change events on TM-published data subscription point
:param subscription_point: FQDN of TM data subscription point
:param callback: callback for event subscription
:return: False if VCC ConfigureScan command failed, otherwise True
"""
self.logger.info(f"Attempting subscription to {subscription_point}")
# split delay_model_subscription_point between device FQDN and attribute name
subscription_point_split = subscription_point.split("/")
fqdn = "/".join(subscription_point_split[:-1])
attr_name = subscription_point_split[-1]
try:
self.proxy_dict[const.TM_LEAF_NODE] = context.DeviceProxy(
device_name=fqdn
)
self._event_handler.subscribe_events(
proxy=self.proxy_dict[const.TM_LEAF_NODE],
change_event_callbacks={attr_name: callback},
)
except tango.DevFailed as dev_failed:
self.logger.error(
f"Failed to subscribe to change events for {subscription_point}; {dev_failed}"
)
return False
return True
def _deconfigure_vcc(
self: CbfSubarrayComponentManager, transaction_id: str
) -> bool:
"""
Return VCC to IDLE state if possible
:param transaction_id: optional transaction ID
:return: False if failed to release FSP device, True otherwise
"""
if len(self._configured_vcc_ids) == 0:
return True
self.logger.info(
f"Deconfiguring the following VCCs: {self._configured_vcc_ids}"
)
# Deconfigure any successfully configured VCC proxies.
go_to_idle_params = orjson.dumps(
{
"interface": CBF_GOTOIDLE_VER1_0,
"transaction_id": transaction_id,
}
).decode()
vcc_failure = False
command_results = self.issue_group_command(
command_name="GoToIdle",
command_args=go_to_idle_params,
proxies=self.configured_vcc_proxies,
check_on_fail={
"obsState": lambda attr_val: attr_val == ObsState.IDLE,
},
)
deconfigured_vcc_ids = []
for vcc_id in self._configured_vcc_ids:
if (
command_results[
self.proxy_dict[const.VCC_ALL_BANDS][vcc_id].dev_name()
]
== ResultCode.OK
):
deconfigured_vcc_ids.append(vcc_id)
else:
vcc_failure = True
self._configured_vcc_ids.difference_update(deconfigured_vcc_ids)
if vcc_failure:
self.logger.error(
f"Failed to deconfigure the following VCCs: {self._configured_vcc_ids}"
)
return False
self.logger.info("Successfully deconfigured all VCCs.")
return True
def _release_all_fsp(
self: CbfSubarrayComponentManager, transaction_id: str
) -> bool:
"""
Remove subarray membership and return FSP to IDLE state if possible
:param transaction_id: optional transaction ID
:return: False if failed to release FSP device, True otherwise
"""
if len(self._assigned_fsp_ids) == 0:
return True
self.logger.info(
f"Releasing all FSP from subarray; current assigned FSPs: {self._assigned_fsp_ids}"
)
# Deconfigure any successfully configured FSP mode proxies.
go_to_idle_params = orjson.dumps(
{
"interface": CBF_GOTOIDLE_VER1_0,
"transaction_id": transaction_id,
}
).decode()
fsp_failure = False
command_results = self.issue_group_command(
command_name="GoToIdle",
command_args=go_to_idle_params,
proxies=self.configured_fsp_mode_proxies,
check_on_fail={
"obsState": lambda attr_val: attr_val == ObsState.IDLE,
},
)
deconfigured_fsp_ids = []
for obs_mode, fsp_ids in self._configured_fsp_mode_ids.items():
for fsp_id in fsp_ids:
fsp_mode_proxy = self.proxy_dict[obs_mode][fsp_id]
if command_results[fsp_mode_proxy.dev_name()] == ResultCode.OK:
self._event_handler.unsubscribe_events(
proxy=fsp_mode_proxy
)
deconfigured_fsp_ids.append((obs_mode, fsp_id))
else:
fsp_failure = True
for obs_mode, fsp_id in deconfigured_fsp_ids:
self._configured_fsp_mode_ids[obs_mode].remove(fsp_id)
# Remove subarray membership from successfully assigned FSP
self.logger.info(
f"Setting FSPs {self._assigned_fsp_ids} to ObsMode.IDLE."
)
update_obs_mode_params = orjson.dumps(
{
"subarray_id": self._subarray_id,
"obs_mode": "IDLE",
}
).decode()
command_results = self.issue_group_command(
command_name="UpdateObsMode",
command_args=update_obs_mode_params,
proxies=self.assigned_fsp_proxies,
# Not checking if obsMode == ObsMode.IDLE as FSP may still be in use
# by other subarrays.
check_on_fail={
"subarrayMembership": lambda attr_val: self._subarray_id
not in attr_val,
},
)
fsp_ids_to_remove = []
for fsp_id in self._assigned_fsp_ids:
fsp_proxy = self.proxy_dict[const.FSP][fsp_id]
if command_results[fsp_proxy.dev_name()] == ResultCode.OK:
self._event_handler.unsubscribe_events(proxy=fsp_proxy)
fsp_ids_to_remove.append(fsp_id)
else:
fsp_failure = True
self._assigned_fsp_ids.difference_update(fsp_ids_to_remove)
if fsp_failure:
self.logger.error(
f"Failed to release all FSPs; assigned FSP IDs: {self._assigned_fsp_ids}; configured FSP modes: {self._configured_fsp_mode_ids}"
)
return False
self.logger.info("Successfully released all FSPs.")
return True
def _deconfigure(
self: CbfSubarrayComponentManager, transaction_id: str = ""
) -> bool:
"""
Completely deconfigure the subarray; all initialization performed by the
ConfigureScan command must be 'undone' here.
This method is invoked by GoToIdle, ConfigureScan, ObsReset and Restart
in CbfSubarray
:param transaction_id: optional transaction ID
:return: False if failed to deconfigure, otherwise True
"""
self.logger.info("Deconfiguring subarray...")
# Update device attribute signals
self._device.scan_id_signal = 0
self._device.configuration_id_signal = ""
self._device.frequency_band_signal = 0
# Unsubscribe from TMC events.
try:
if self.proxy_dict.get(const.TM_LEAF_NODE) is not None:
self.logger.info("Unsubscribing from TM events.")
self._event_handler.unsubscribe_events(
proxy=self.proxy_dict[const.TM_LEAF_NODE]
)
self.proxy_dict[const.TM_LEAF_NODE] = None
except tango.DevFailed as dev_failed:
self.logger.error(
f"Error in unsubscribing from TM events; {dev_failed}"
)
# Event processing thread exits when this is popped from the queue
if self._delay_model_thread is not None:
self.delay_model_event_callback(EventHandler.EXIT_CONDITION)
self._delay_model_thread.join()
self._delay_model_thread = None
# Clear queue; thread may have exited before reaching EXIT_CONDITION
self._clear_queue(self._delay_model_queue, "delayModel")
# Update delayModel attribute signal
self._device.delay_model_signal = orjson.dumps(
{"start_validity_sec": 0}
).decode()
fsp_success = self._release_all_fsp(transaction_id)
if not fsp_success:
return False
vcc_success = self._deconfigure_vcc(transaction_id)
if not vcc_success:
return False
return True
# --- Scan Command --- #
[docs]
def scan_start_time_rounded_callback(
self: CbfSubarrayComponentManager, event_data: tango.EventData
):
"""
Callback for FspModeSubarray's scanStartTimeRounded attribute change.
If _scan_started is False, set _scanStarted to True and sync the received
scanStartTimeRounded as firstOutputTime to all FspModeSubarray devices in the subarray.
:param event_data: Tango attribute change event data
:type event_data: Optional[tango.EventData]
"""
# NOTE (CIP-3559): event callbacks must be as short as possible
# Here we simply queue event data and handle possible exceptions
try:
self._scan_start_time_rounded_queue.put(event_data)
except Full:
# Start dropping data if queue is full
self._scan_start_time_rounded_queue.get_nowait()
self._scan_start_time_rounded_queue.task_done()
self._scan_start_time_rounded_queue.put(event_data)
def _set_first_output_time(
self: CbfSubarrayComponentManager,
):
"""
Process FspModeSubarray's scanStartTimeRounded attribute change events and
synchronize first output timestamps.
If _scan_started is False, set _scanStarted to True and sync the received
scanStartTimeRounded as firstOutputTime to all FspModeSubarray device in the subarray.
:param event_data: Tango attribute change event data
:type event_data: Optional[tango.EventData]
"""
with tango.EnsureOmniThread():
self.logger.info("Entering scanStartTimeRounded Thread...")
# Subscribe to scanStartTimeRounded change events, then begin processing
# events queue.
for fsp_mode_proxy in self.configured_fsp_mode_proxies:
self._event_handler.subscribe_events(
proxy=fsp_mode_proxy,
change_event_callbacks={
"scanStartTimeRounded": self.scan_start_time_rounded_callback
},
)
while True:
event_data = self._scan_start_time_rounded_queue.get()
self._scan_start_time_rounded_queue.task_done()
if event_data is EventHandler.EXIT_CONDITION:
# This ends the thread
self.logger.error(
"scanStartTimeRounded event received is EventHandler.EXIT_CONDITION"
)
break
# Skip zero timestamps.
timestamp = int(event_data.attr_value.value)
if timestamp == 0:
continue
current_scan_started = self._read_scan_started()
# Next sync the event value (startScanTimeRounded) to FspModeSubarrays's firstOutputTime
if (
current_scan_started is False
and self._device.obs_state_signal == ObsState.SCANNING
):
command_results = self.issue_group_command(
command_name="SetFirstOutputTime",
proxies=self.configured_fsp_mode_proxies,
command_args=timestamp,
check_on_fail={
"firstOutputTime": lambda attr_val: attr_val
== timestamp
},
)
if any(
result_code != ResultCode.OK
for result_code in command_results.values()
):
self.logger.error("Failed to sync firstOutputTime.")
# TODO: break here, but should we continue to try with
# the next scanStartTimeRounded event?
break
# Indicate the scan start time has been synchronized.
self._update_scan_started(True)
break
elif (
current_scan_started is True
and self._device.obs_state_signal == ObsState.SCANNING
):
self.logger.debug(
"Scan already started and firstOutputTime has already been set, skipping syncing firstOutputTime"
)
break
elif (
current_scan_started is True
and self._device.obs_state_signal != ObsState.SCANNING
):
self.logger.debug(
f"Current Subarray is in ObsState.{self._device.obs_state_signal.name} "
"but _scan_started flag is set to True. Skipping syncing firstOutputTime"
)
break
elif (
current_scan_started is False
and self._device.obs_state_signal != ObsState.SCANNING
):
self.logger.warning(
f"Scan has not been started, skipping syncing firstOutputTime of value {event_data.attr_value.value}; "
f"scan_started: {current_scan_started}; "
f"observing state: {self._device.obs_state_signal}"
)
# the elif case covers all permutations, no else case needed
# Since we only need one timestamp, we can unsubscribe from all events here.
for proxy in self.configured_fsp_mode_proxies:
self._event_handler.unsubscribe_events(
proxy=proxy,
attr_names=["scanStartTimeRounded"],
)
self.logger.info("Exiting scanStartTimeRounded Thread.")
[docs]
def scan(
self: CbfSubarrayComponentManager,
params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Start subarray Scan operation.
:param params: JSON formatted string with the scan id
:param task_callback: callback for driving status of task executor's
current LRC task
:param task_abort_event: event indicating Abort has been issued
"""
# set task status in progress, check for abort event
task_callback(status=TaskStatus.IN_PROGRESS)
if self.task_abort_event_is_set(
"Scan", task_callback, task_abort_event
):
return
# Validate the input
valid, scan_params_dict = self._handle_command_validation(
const.COMMAND_SCAN, params, task_callback
)
if not valid:
return
self._set_transaction_id(scan_params_dict)
self._log_transaction_id(const.COMMAND_SCAN, scan_params_dict)
# Flag for scanStartTimeRounded callbacks to determine if the Subarray has already sync the value to the FSPs
# TODO: Need to update comment when FspPstSubarray is implemented?
# Is scan consider started when either FspModeSubarray sends the first scanStartTimeRounded?
self._update_scan_started(False)
if self._scan_start_time_rounded_thread is None:
self._scan_start_time_rounded_thread = Thread(
target=self._set_first_output_time,
name="CbfSubarray._set_first_output_time",
daemon=True,
)
self._scan_start_time_rounded_thread.start()
# Issue Scan command to assigned resources.
command_results = self.issue_group_command(
command_name="Scan",
command_args=orjson.dumps(scan_params_dict).decode(),
proxies=self.configured_obs_proxies,
check_on_fail={
"obsState": lambda attr_val: attr_val == ObsState.SCANNING,
},
)
if any(
result_code != ResultCode.OK
for result_code in command_results.values()
):
task_callback(
status=TaskStatus.COMPLETED,
result=(
ResultCode.FAILED,
"Failed to issue Scan command to VCC/FSP",
),
)
return
# Update scanID attribute signal
self._device.scan_id_signal = int(scan_params_dict["scan_id"])
# Update ObsState machine via callback
self._update_component_state(scanning=True)
task_callback(
result=(ResultCode.OK, "Scan completed OK"),
status=TaskStatus.COMPLETED,
)
return
# --- EndScan Command --- #
[docs]
def end_scan(
self: CbfSubarrayComponentManager,
params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
End scan operation.
:param params: JSON string with endscan input parameters
:param task_callback: callback for driving status of task executor's
current LRC task
:param task_abort_event: event indicating Abort has been issued
"""
# set task status in progress, check for abort event
task_callback(status=TaskStatus.IN_PROGRESS)
if self.task_abort_event_is_set(
"EndScan", task_callback, task_abort_event
):
return
# Validate the input
valid, endscan_params_dict = self._handle_command_validation(
const.COMMAND_ENDSCAN, params, task_callback
)
if not valid:
return
self._set_transaction_id(endscan_params_dict)
self._log_transaction_id(const.COMMAND_ENDSCAN, endscan_params_dict)
# Issue EndScan to assigned resources.
command_results = self.issue_group_command(
command_name="EndScan",
command_args=params,
proxies=self.configured_obs_proxies,
check_on_fail={
"obsState": lambda attr_val: attr_val == ObsState.READY,
},
)
if any(
result_code != ResultCode.OK
for result_code in command_results.values()
):
task_callback(
status=TaskStatus.COMPLETED,
result=(
ResultCode.FAILED,
"Failed to issue EndScan command to VCC/FSP",
),
)
return
if self._scan_start_time_rounded_thread is not None:
self.scan_start_time_rounded_callback(EventHandler.EXIT_CONDITION)
self._scan_start_time_rounded_thread.join()
self._scan_start_time_rounded_thread = None
# Clear queue; thread may have exited before reaching EXIT_CONDITION
self._clear_queue(
self._scan_start_time_rounded_queue, "scanStartTimeRounded"
)
self._update_scan_started(False)
# Update ObsState machine via callback
self._update_component_state(scanning=False)
task_callback(
result=(ResultCode.OK, "EndScan completed OK"),
status=TaskStatus.COMPLETED,
)
return
# --- GoToIdle Command --- #
[docs]
def go_to_idle(
self: CbfSubarrayComponentManager,
params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Execute observing state transition from READY to IDLE.
:param params: JSON string with endscan input parameters
:param task_callback: callback for driving status of task executor's
current LRC task
:param task_abort_event: event indicating Abort has been issued
"""
# set task status in progress, check for abort event
task_callback(status=TaskStatus.IN_PROGRESS)
if self.task_abort_event_is_set(
"GoToIdle", task_callback, task_abort_event
):
return
# Validate the input
valid, go_to_idle_params_dict = self._handle_command_validation(
const.COMMAND_GOTOIDLE, params, task_callback
)
if not valid:
return
self._set_transaction_id(go_to_idle_params_dict)
self._log_transaction_id(
const.COMMAND_GOTOIDLE, go_to_idle_params_dict
)
transaction_id = go_to_idle_params_dict[param_keys.TRANSACTION_ID]
# deconfigure to reset assigned FSPs and unsubscribe from events.
deconfigure_success = self._deconfigure(transaction_id)
if not deconfigure_success:
task_callback(
status=TaskStatus.COMPLETED,
result=(
ResultCode.FAILED,
"Failed to deconfigure subarray",
),
)
return
# Update ObsState machine via callback
self._update_component_state(configured=False)
task_callback(
result=(ResultCode.OK, "GoToIdle completed OK"),
status=TaskStatus.COMPLETED,
)
return
# --------------
# Abort Commands
# --------------
# --- Abort Command --- #
[docs]
def abort(
self: CbfSubarrayComponentManager,
params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Abort the current scan operation.
Due to the obsState model trigger "abort_completed" always transitioning
out of ABORTING to ABORTED, this thread must not return until the very end,
otherwise the device could successfully transition to ABORTED without issuing
a scan abort to sub-devices (refactor in response to SKB-796).
:param params: JSON string with endscan input parameters
:param task_callback: callback for driving status of task executor's
current LRC task
:param task_abort_event: event indicating AbortCommands has been issued
:return: None
"""
result = (ResultCode.OK, "Abort completed OK")
status = TaskStatus.COMPLETED
# set task status in progress, check for abort event
task_callback(status=TaskStatus.IN_PROGRESS)
if self.task_abort_event_is_set(
"Abort", task_callback, task_abort_event
):
message = "Abort command aborted by task executor abort event"
self.logger.error(message)
result = (ResultCode.FAILED, message)
status = TaskStatus.COMPLETED
# Validate the input
# Use _validate_command over _handle_command_validation because:
# 1) We don't want to set the task_callback (see method docstring)
# 2) We want the invalid reason to put into the result
valid, param_dict, invalid_reason = self._validate_command(
const.COMMAND_ABORT, params
)
if not valid:
result = (ResultCode.FAILED, invalid_reason)
status = TaskStatus.COMPLETED
param_dict = {
param_keys.INTERFACE: CBF_ABORT_VER1_0,
param_keys.TRANSACTION_ID: "",
}
self._set_transaction_id(param_dict)
self._log_transaction_id(const.COMMAND_ABORT, param_dict)
# TODO: Determine subarray Abort command control of VCCAllBandsController.
# Currently VCCAllBandsController inherits from ska-tango-base v1.0.0,
# implementing only the AbortCommands command, which has a different
# use procedure than the way Abort was implemented here previously.
# Issue Abort to assigned resources.
command_results = self.issue_group_command(
command_name="Abort",
proxies=self.configured_obs_proxies,
command_args=orjson.dumps(param_dict).decode(),
check_on_fail={
"obsState": lambda attr_val: attr_val == ObsState.ABORTED,
},
)
if any(
result_code != ResultCode.OK
for result_code in command_results.values()
):
message = "Failed to issue Abort command to VCC/FSP"
self.logger.error(message)
result = (ResultCode.FAILED, message)
status = TaskStatus.COMPLETED
# Update ObsState machine via callback
self._update_component_state(scanning=False)
task_callback(result=result, status=status)
self._update_scan_started(False)
return
# --- ObsReset Command --- #
[docs]
def obs_reset(
self: CbfSubarrayComponentManager,
obsreset_params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Reset the scan operation to IDLE from ABORTED or FAULT.
Due to the obsState model trigger "obsreset_invoked" only allowing a
transition out of ABORTED and FAULT states, and "obsreset_completed" always
transitioning out of RESETTING to IDLE, this thread must not return until
the very end, otherwise the device could successfully transition to IDLE
without properly deconfiguring (refactor in response to SKB-796).
:param task_callback: callback for driving status of task executor's
current LRC task
:param task_abort_event: event indicating AbortCommands has been issued
:return: None
"""
result = (ResultCode.OK, "ObsReset completed OK")
status = TaskStatus.COMPLETED
# set task status in progress, check for abort event
task_callback(status=TaskStatus.IN_PROGRESS)
if self.task_abort_event_is_set(
"ObsReset", task_callback, task_abort_event
):
message = "ObsReset command aborted by task executor abort event"
self.logger.error(message)
result = (ResultCode.FAILED, message)
status = TaskStatus.COMPLETED
# Validate the input
# Use _validate_command over _handle_command_validation because:
# 1) We don't want to set the task_callback (see method docstring)
# 2) We want the invalid reason to put into the result
valid, param_dict, invalid_reason = self._validate_command(
const.COMMAND_OBSRESET, obsreset_params
)
if not valid:
result = (ResultCode.FAILED, invalid_reason)
status = TaskStatus.COMPLETED
param_dict = {
param_keys.INTERFACE: CBF_OBSRESET_VER1_0,
param_keys.TRANSACTION_ID: "",
}
self._set_transaction_id(param_dict)
self._log_transaction_id(const.COMMAND_OBSRESET, param_dict)
transaction_id = param_dict[param_keys.TRANSACTION_ID]
# if subarray is in FAULT, we must first abort VCC and FSP operation
# this will allow us to call ObsReset on them even if they are not in FAULT
if self._component_state["obsfault"]:
# build abort command parameters
abort_params = {
"interface": CBF_ABORT_VER1_0,
"transaction_id": transaction_id,
}
command_results = self.issue_group_command(
command_name="Abort",
proxies=self.configured_obs_proxies,
command_args=orjson.dumps(abort_params).decode(),
check_on_fail={
"obsState": lambda attr_val: attr_val == ObsState.ABORTED,
},
)
if any(
result_code != ResultCode.OK
for result_code in command_results.values()
):
message = "Failed to issue Abort command to VCC/FSP"
self.logger.error(message)
result = (ResultCode.FAILED, message)
status = TaskStatus.COMPLETED
command_results = self.issue_group_command(
command_name="ObsReset",
command_args=orjson.dumps(param_dict).decode(),
proxies=self.configured_obs_proxies,
check_on_fail={
"obsState": lambda attr_val: attr_val == ObsState.IDLE,
},
)
if any(
result_code != ResultCode.OK
for result_code in command_results.values()
):
message = "Failed to issue ObsReset command to VCC/FSP"
self.logger.error(message)
result = (ResultCode.FAILED, message)
status = TaskStatus.COMPLETED
# We might have interrupted a long-running command such as a Configure
# or a Scan, so we need to clean up from that.
# deconfigure to reset assigned FSPs and unsubscribe from events.
deconfigure_success = self._deconfigure(transaction_id)
if not deconfigure_success:
message = "Failed to deconfigure subarray"
self.logger.error(message)
result = (ResultCode.FAILED, message)
status = TaskStatus.COMPLETED
# Update ObsState machine via callback
# There is no obsfault == False action implemented, however,
# we reset it to False so that obsfault == True may be triggered in the future,
# by updating the component state dict in BaseComponentManager.
self._update_component_state(configured=False, obsfault=False)
task_callback(result=result, status=status)
return
# --- Restart Command --- #
[docs]
def restart(
self: CbfSubarrayComponentManager,
restart_params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Reset the scan operation to EMPTY from ABORTED or FAULT.
Due to the obsState model trigger "restart_invoked" only allowing a
transition out of ABORTED and FAULT states, and "restart_completed" always
transitioning out of RESTARTING to EMPTY, this thread must not return until
the very end, otherwise the device could successfully transition to EMPTY
without properly deconfiguring (refactor in response to SKB-796).
:param task_callback: callback for driving status of task executor's
current LRC task
:param task_abort_event: event indicating AbortCommands has been issued
:return: None
"""
result = (ResultCode.OK, "Restart completed OK")
status = TaskStatus.COMPLETED
# set task status in progress, check for abort event
task_callback(status=TaskStatus.IN_PROGRESS)
if self.task_abort_event_is_set(
"Restart", task_callback, task_abort_event
):
message = "Restart command aborted by task executor abort event"
self.logger.error(message)
result = (ResultCode.FAILED, message)
status = TaskStatus.COMPLETED
# Validate the input
# Use _validate_command over _handle_command_validation because:
# 1) We don't want to set the task_callback (see method docstring)
# 2) We want the invalid reason to put into the result
valid, param_dict, invalid_reason = self._validate_command(
COMMAND_RESTART, restart_params
)
if not valid:
result = (ResultCode.FAILED, invalid_reason)
status = TaskStatus.COMPLETED
param_dict = {
param_keys.INTERFACE: CBF_RESTART_VER1_0,
param_keys.TRANSACTION_ID: "",
}
self._set_transaction_id(param_dict)
self._log_transaction_id(COMMAND_RESTART, param_dict)
transaction_id = param_dict[param_keys.TRANSACTION_ID]
# if subarray is in FAULT, we must first abort VCC and FSP operation
# this will allow us to call ObsReset on them even if they are not in FAULT
if self._component_state["obsfault"]:
# build abort command parameters
abort_params = orjson.dumps(
{
"interface": CBF_ABORT_VER1_0,
"transaction_id": transaction_id,
}
).decode()
command_results = self.issue_group_command(
command_name="Abort",
proxies=self.configured_obs_proxies,
command_args=abort_params,
check_on_fail={
"obsState": lambda attr_val: attr_val == ObsState.ABORTED,
},
)
if any(
result_code != ResultCode.OK
for result_code in command_results.values()
):
message = "Failed to issue Abort command to VCC/FSP"
self.logger.error(message)
result = (ResultCode.FAILED, message)
status = TaskStatus.COMPLETED
# build ObsReset command parameters
obsreset_params = orjson.dumps(
{
"interface": CBF_OBSRESET_VER1_0,
"transaction_id": param_dict[param_keys.TRANSACTION_ID],
}
).decode()
command_results = self.issue_group_command(
command_name="ObsReset",
command_args=obsreset_params,
proxies=self.configured_obs_proxies,
check_on_fail={
"obsState": lambda attr_val: attr_val == ObsState.IDLE,
},
)
if any(
result_code != ResultCode.OK
for result_code in command_results.values()
):
message = "Failed to issue ObsReset command to VCC/FSP"
self.logger.error(message)
result = (ResultCode.FAILED, message)
status = TaskStatus.COMPLETED
# We might have interrupted a long-running command such as a Configure
# or a Scan, so we need to clean up from that.
# deconfigure to reset assigned FSPs and unsubscribe from events.
deconfigure_success = self._deconfigure(transaction_id)
if not deconfigure_success:
message = "Failed to deconfigure subarray"
self.logger.error(message)
result = (ResultCode.FAILED, message)
status = TaskStatus.COMPLETED
# Update ObsState machine via callback
self._update_component_state(configured=False)
# remove all assigned VCCs to return to EMPTY
self._release_vcc_resources(dish_ids=list(self._assigned_dish_ids))
if len(self._assigned_dish_ids) != 0:
message = "Failed to release all DISH resources."
self.logger.error(message)
result = (ResultCode.FAILED, message)
status = TaskStatus.COMPLETED
# Update ObsState machine via callback
# There is no obsfault == False action implemented, however,
# we reset it to False so that obsfault == True may be triggered in the future,
# by updating the component state dict in BaseComponentManager.
self._update_component_state(obsfault=False)
task_callback(result=result, status=status)
return