# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.
from __future__ import annotations
from os import environ
from queue import Queue
from threading import Event
from typing import Any, Callable, Dict, List, Optional
import orjson
import tango
from ska_control_model import AdminMode, ObsState, PowerState, TaskStatus
from ska_schemas.schema import validate as schema_validate
from ska_tango_base.commands import ResultCode
from ska_tango_testing import context
from ska_telmodel_client.frontend import TMData
from ska_mid_cbf_mcs.base.component_manager import (
CbfComponentManager,
CommunicationStatus,
)
from ska_mid_cbf_mcs.commons.dish_utils import DISHUtils
from ska_mid_cbf_mcs.commons.global_enum import const, param_keys
from ska_mid_cbf_mcs.commons.validate_interface import (
CBF_RESOURCESTATUS_VER1_0,
validate_interface,
)
from ska_mid_cbf_mcs.commons.vcc_unit_utils import VCCUnitUtil
__all__ = ["ControllerComponentManager"]
[docs]
class ControllerComponentManager(CbfComponentManager):
"""
A component manager for the CbfController device.
"""
def __init__(
self: ControllerComponentManager,
*args: Any,
fqdn_dict: Dict[str, List[str]],
max_capabilities: Dict[str, int],
vcc_id_to_vcc_unit_id_mapping: List[str],
**kwargs: Any,
) -> None:
"""
Initialise a new instance.
:param fqdn_dict: dictionary containing FQDNs for the controller's sub-elements
:param max_capabilities: dictionary containing maximum number of sub-elements
"""
super().__init__(*args, **kwargs)
self.validate_supported_configuration = True
# --- Capabilities --- #
self._count_subarray = max_capabilities[const.SUBARRAY]
self._count_vcc = max_capabilities[const.VCC]
self._count_fsp = max_capabilities[const.FSP]
# --- FQDNs --- #
# TODO (CIP-3072): for now, controller sets all FHS simulators ONLINE
self._fqdn_dict = fqdn_dict
self.dish_utils = None
self.last_init_sys_param = ""
self.source_init_sys_param = ""
# device type dictionary
self._device_types = [
const.SUBARRAY,
const.VCC_UNIT,
const.FSP_UNIT,
]
# Vcc Unit utils
self._vcc_unit_util = VCCUnitUtil(vcc_id_to_vcc_unit_id_mapping)
# -------------
# Communication
# -------------
# --- Start Communicating --- #
def _init_device_proxy(
self: ControllerComponentManager,
device_type: str,
fqdn: str,
device_id: int,
admin_mode: AdminMode,
subscribe_results: bool = False,
subscribe_state: bool = False,
) -> bool:
"""
Initialize the device proxy from its FQDN, store the proxy in the _proxies dictionary,
and set the AdminMode to ONLINE
:param fqdn: FQDN of the device
:param subscribe_results: True if should subscribe to the device's longRunningCommandResult attribute;
defaults to False
:param subscribe_state: True if should subscribe to the device's state attribute;
defaults to False
:return: True if the device proxy is successfully initialized, False otherwise.
"""
proxy = None
if device_id not in self.proxy_dict[device_type]:
try:
self.logger.info(f"Establishing connection to {fqdn}")
proxy = context.DeviceProxy(device_name=fqdn)
self.proxy_dict[device_type][device_id] = proxy
except (tango.DevFailed, Exception) as dev_failed:
self.logger.error(
f"Failure in connection to {fqdn}: {dev_failed}"
)
return False
else:
proxy = self.proxy_dict[device_type][device_id]
if subscribe_results:
try:
self._event_handler.subscribe_events(
proxy=proxy,
change_event_callbacks={
"longRunningCommandResult": self.results_callback
},
)
except tango.DevFailed as dev_failed:
self.logger.error(
f"Failed to subscribe to {fqdn}/longRunningCommandResult attribute change events; {dev_failed}"
)
return False
if subscribe_state:
try:
self._op_states[fqdn] = None
self._event_handler.subscribe_events(
proxy=proxy,
change_event_callbacks={"state": self.op_state_callback},
)
except tango.DevFailed as dev_failed:
self.logger.error(
f"Failed to subscribe to {fqdn}/state attribute change events; {dev_failed}"
)
return False
try:
self.logger.info(f"Setting {fqdn} {admin_mode.name}")
proxy.adminMode = admin_mode
except tango.DevFailed as dev_failed:
self.logger.error(f"Failed to set AdminMode: {dev_failed}")
return False
return True
def _init_device_proxies(
self: ControllerComponentManager, admin_mode: AdminMode
) -> bool:
"""
Initialize all device proxies.
:return: True if the device proxies are all successfully initialized, False otherwise.
"""
init_success = True
# TODO (CIP-3072): for now, controller sets all FHS simulators ONLINE
for device_type in self._device_types:
subscribe_results = False
if device_type == const.SUBARRAY:
subscribe_results = True
self.proxy_dict[device_type] = {}
for device_id, fqdn in enumerate(self._fqdn_dict[device_type], 1):
if not self._init_device_proxy(
device_type=device_type,
fqdn=fqdn,
device_id=device_id,
admin_mode=admin_mode,
subscribe_results=subscribe_results,
):
init_success = False
return init_success
[docs]
def start_communicating(
self: ControllerComponentManager,
*args: Any,
admin_mode: AdminMode,
**kwargs: Any,
) -> None:
"""
Thread for start_communicating operation.
"""
self.logger.info(
"Entering ControllerComponentManager.start_communicating"
)
self._op_states_queue = Queue(maxsize=256)
# Initialize device proxies
if not self._init_device_proxies(admin_mode=admin_mode):
self.logger.error("Failed to initialize proxies.")
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
return
super().start_communicating(admin_mode=admin_mode)
self._update_component_state(power=PowerState.OFF)
[docs]
def stop_communicating(
self: ControllerComponentManager,
*args: Any,
admin_mode: AdminMode,
**kwargs: Any,
) -> None:
"""
Thread for stop_communicating operation.
"""
self.logger.info(
f"Entering ControllerComponentManager.stop_communicating: {admin_mode.name}"
)
# Order of operations for sub devices:
# 1. Set the sub devices to AdminMode.OFFLINE to trigger transition to DevState.DISABLE
# 2. Wait for the sub devices to transition to DevState.DISABLE
# 3. Unsubscribe from the change events with the sub devices
for device_type, proxy_dict in self.proxy_dict.items():
for device_id, proxy in proxy_dict.items():
try:
self.logger.info(
f"Setting {device_type} {device_id} to {admin_mode.name}"
)
proxy.adminMode = admin_mode
except tango.DevFailed as dev_failed:
self.logger.info(
f"Failed to stop communications {admin_mode.name} with {device_type} {device_id}; {dev_failed}"
)
continue
if admin_mode == AdminMode.OFFLINE:
try:
self._event_handler.unsubscribe_events()
except tango.DevFailed as dev_failed:
self.logger.error(
f"Failed to unsubscribe from subarray events; {dev_failed}"
)
super().stop_communicating(admin_mode=admin_mode)
# --------
# Commands
# --------
# --- InitSysParam Command --- #
def _validate_init_sys_param(
self: ControllerComponentManager,
params: str,
) -> tuple[bool, dict | None]:
"""
Validate the InitSysParam against the ska-schemas schema
:param params: The InitSysParam parameters as a JSON-formatted str
:return: True if the InitSysParam parameters are valid, False otherwise
"""
# Validate supported interface passed in the JSON string.
# If invalid, validate_interface will return a message instead of the
# param dict.
(valid, param_dict) = validate_interface(params, "initsysparam")
if not valid:
self.logger.error(param_dict)
return False, None
# Validate init_sys_param against the telescope model
try:
# Workaround to enable strict validation here.
# Need to set both PYTEST_VERSION to something and
# SKA_SCHEMAS_ALLOW_STRICT_VALIDATION to True
pytest_version = environ.get("PYTEST_VERSION")
if pytest_version is None:
environ["PYTEST_VERSION"] = "7.2.0"
environ["SKA_SCHEMAS_ALLOW_STRICT_VALIDATION"] = "True"
schema_validate(
version=param_dict["interface"],
config=param_dict,
strictness=2,
)
self.logger.info(
"InitSysParam validation against ska-schemas schema was successful!"
)
except ValueError as value_error:
self.logger.error(
f"InitSysParam validation against ska-schemas schema failed with exception: {str(value_error)}"
)
return False, None
return True, param_dict
def _retrieve_sys_param_file(
self: ControllerComponentManager,
init_sys_param_json: Dict[Any, Any],
) -> tuple[bool, Dict[Any, Any]]:
"""
Retrieve the sys_param file from the Telescope Model
:param init_sys_param_json: The InitSysParam parameters
"""
# The uri was provided in the input string, therefore the mapping from Dish ID to
# VCC and frequency offset k needs to be retrieved using the Telescope Model
tm_data_sources = init_sys_param_json["tm_data_sources"][0]
tm_data_filepath = init_sys_param_json["tm_data_filepath"]
try:
mid_cbf_param_dict = TMData([tm_data_sources])[
tm_data_filepath
].get_dict()
self.logger.info(
f"Successfully retrieved json data from {tm_data_filepath} in {tm_data_sources}"
)
except (ValueError, KeyError) as error:
self.logger.error(
f"Retrieving the init_sys_param file failed with exception: {str(error)}"
)
return (False, None)
return (True, mid_cbf_param_dict)
def _update_init_sys_param(
self: ControllerComponentManager,
params: str,
) -> bool:
"""
Update the InitSysParam parameters in the subarrays and VCCs
:param params: The InitSysParam parameters
:return: True if the InitSysParam parameters are successfully updated, False otherwise
"""
# Write the init_sys_param to each of the subarrays
self.logger.debug(f"Updating sysParam: {params}")
for device_id, proxy in self.proxy_dict[const.SUBARRAY].items():
try:
proxy.sysParam = params
except tango.DevFailed as dev_failed:
self.logger.error(
f"Failure in connection to Subarray {device_id}; {dev_failed}"
)
return False
return True
[docs]
def init_sys_param(
self: ControllerComponentManager,
sys_params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Validate and save the Dish ID - VCC ID mapping and k values.
:param sys_params: the Dish ID - VCC ID mapping and k values in a
json string.
:param task_callback: Callback function to update task status
:param task_abort_event: Event to signal task abort.
"""
self.logger.debug(f"Received sys params {sys_params}")
task_callback(status=TaskStatus.IN_PROGRESS)
if self.task_abort_event_is_set(
"InitSysParam", task_callback, task_abort_event
):
return
valid, sys_param_dict = self._validate_init_sys_param(sys_params)
if not valid:
task_callback(
result=(
ResultCode.FAILED,
"Validating init_sys_param file failed",
),
status=TaskStatus.COMPLETED,
)
return
# If tm_data_filepath is provided, then we need to retrieve the
# init sys param file from CAR via the telescope model and re-validate.
if "tm_data_filepath" in sys_param_dict:
passed, sys_param_dict = self._retrieve_sys_param_file(
sys_param_dict
)
if not passed:
task_callback(
result=(
ResultCode.FAILED,
"Retrieving the init_sys_param file failed",
),
status=TaskStatus.COMPLETED,
)
return
valid, sys_param_dict = self._validate_init_sys_param(
orjson.dumps(sys_param_dict).decode()
)
if not valid:
task_callback(
result=(
ResultCode.FAILED,
"Validating init_sys_param file retrieved from tm_data_filepath against ska-schemas schema failed",
),
status=TaskStatus.COMPLETED,
)
return
self.source_init_sys_param = sys_params
self.last_init_sys_param = orjson.dumps(sys_param_dict).decode()
else:
self.source_init_sys_param = ""
self.last_init_sys_param = sys_params
# Store the attribute
self.dish_utils = DISHUtils(sys_param_dict)
# Send init_sys_param to the subarrays and VCCs.
if not self._update_init_sys_param(self.last_init_sys_param):
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
task_callback(
result=(
ResultCode.FAILED,
"Failed to update subarrays and/or VCCs with init_sys_param",
),
status=TaskStatus.COMPLETED,
)
return
task_callback(
result=(
ResultCode.OK,
"InitSysParam completed OK",
),
status=TaskStatus.COMPLETED,
)
return
# --- Set Resource Admin Mode Command --- #
def _validate_set_resource_admin_mode(
self: ControllerComponentManager,
params: str,
) -> Dict:
"""
Validate the SetResourceAdminMode against the ska-schemas schema
:param params: The SetResourceAdminMode parameters in a json string.
:return: True if the SetResourceAdminMode parameters are valid, False otherwise
"""
# Validate supported interface passed in the JSON string
set_resource_admin_mode_param = {}
# If invalid, validate_interface will return a message instead of the
# param dict.
(valid, set_resource_admin_mode_param) = validate_interface(
params, "setresourceadminmode"
)
if not valid:
self.logger.error(set_resource_admin_mode_param)
return None
# Validate set_resource_admin_mode against the telescope model
try:
# Workaround to enable strict validation here.
# Need to set both PYTEST_VERSION to something and
# SKA_SCHEMAS_ALLOW_STRICT_VALIDATION to True
pytest_version = environ.get("PYTEST_VERSION")
if pytest_version is None:
environ["PYTEST_VERSION"] = "7.2.0"
environ["SKA_SCHEMAS_ALLOW_STRICT_VALIDATION"] = "True"
schema_validate(
version=set_resource_admin_mode_param[param_keys.INTERFACE],
config=set_resource_admin_mode_param,
strictness=2,
)
self.logger.info(
"SetResourceAdminMode validation against ska-schemas schema was successful!"
)
except ValueError as value_error:
self.logger.error(
f"SetResourceAdminMode validation against ska-schemas schema failed with exception: {str(value_error)}"
)
return None
return set_resource_admin_mode_param
[docs]
def set_resource_admin_mode(
self: ControllerComponentManager,
admin_mode_params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Validate and save the resource id and admin mode.
:param admin_mode_params: the resource id and admin mode in a json string.
:param task_callback: Callback function to update task status
:param task_abort_event: Event to signal task abort.
"""
self.logger.debug(f"Received admin mode params {admin_mode_params}")
task_callback(status=TaskStatus.IN_PROGRESS)
if self.task_abort_event_is_set(
"SetResourceAdminMode", task_callback, task_abort_event
):
return
set_resource_admin_mode_param = self._validate_set_resource_admin_mode(
admin_mode_params
)
if set_resource_admin_mode_param is None:
task_callback(
result=(
ResultCode.FAILED,
"Validating set_resource_admin_mode against ska-schemas schema failed",
),
status=TaskStatus.COMPLETED,
)
return
# MCS is not checking the state of VCC/FSP Unit before setting the admin mode
# because the assumption is that the maintainer or operator are taking
# the responsibility to check the state before setting the admin mode
transaction_id = set_resource_admin_mode_param.get(
param_keys.TRANSACTION_ID
)
message = "Processing SetResourceAdminMode command"
if transaction_id:
message += f" with transaction ID: {transaction_id}"
self.logger.info(message)
admin_mode = AdminMode[
set_resource_admin_mode_param[param_keys.ADMIN_MODE]
]
devices_failed = {
const.VCC_UNIT: [],
const.FSP_UNIT: [],
}
def set_admin_mode_for_device_type(
device_type: str, device_ids: List[str]
) -> None:
for device_id in device_ids:
try:
device_proxy = self.proxy_dict[device_type][device_id]
device_proxy.adminMode = admin_mode
except (KeyError, tango.DevFailed) as dev_failed:
self.logger.error(
f"Unable to set {device_type} {device_id} adminMode to {admin_mode}; {dev_failed}"
)
devices_failed[device_type].append(device_id)
# vcc_unit_ids is an optional parameter
vcc_unit_ids = set_resource_admin_mode_param.get(
param_keys.VCC_UNIT_IDS, []
)
set_admin_mode_for_device_type(const.VCC_UNIT, vcc_unit_ids)
# fsp_unit_ids is an optional parameter
fsp_unit_ids = set_resource_admin_mode_param.get(
param_keys.FSP_UNIT_IDS, []
)
set_admin_mode_for_device_type(const.FSP_UNIT, fsp_unit_ids)
if devices_failed[const.VCC_UNIT] or devices_failed[const.FSP_UNIT]:
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
task_callback(
result=(
ResultCode.FAILED,
f"Failed to update adminMode to {admin_mode} for the following devices: {devices_failed}",
),
status=TaskStatus.COMPLETED,
)
return
# TODO CIP-4089: Setting the admin mode for FSP Units in future story
task_callback(
result=(
ResultCode.OK,
"SetResourceAdminMode completed OK",
),
status=TaskStatus.COMPLETED,
)
return
# --- On Command --- #
[docs]
def on(
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Turn on the controller and its subordinate devices.
This command will succeed and set controller operational state to ON if
at least 1 LRU is successfully powered ON.
:param task_callback: Callback function to update task status.
:param task_abort_event: Event to signal task abort.
"""
self.logger.debug("Entering ControllerComponentManager.on")
task_callback(status=TaskStatus.IN_PROGRESS)
if self.task_abort_event_is_set("On", task_callback, task_abort_event):
return
# TODO: for AA2 On/Off commands will be removed
self._update_component_state(power=PowerState.ON)
task_callback(
result=(ResultCode.OK, "On completed OK"),
status=TaskStatus.COMPLETED,
)
return
# --- Off Command --- #
[docs]
def off(
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Turn off the controller and its subordinate devices.
This command will succeed and set controller operational state to OFF only
if the entire subordinate system is returned to OFF state.
:param task_callback: Callback function to update task status
:param task_abort_event: Event to signal task abort.
"""
self.logger.debug("Entering ControllerComponentManager.off")
task_callback(status=TaskStatus.IN_PROGRESS)
if self.task_abort_event_is_set(
"Off", task_callback, task_abort_event
):
return
# If any subarray is not in ObsState.EMPTY, issue Abort and Restart commands.
# Off command will fail if any subarray commands fail.
subarrays_to_empty = [
subarray
for subarray in self.proxy_dict[const.SUBARRAY].values()
if subarray.obsState != ObsState.EMPTY
]
if len(subarrays_to_empty) > 0:
abort_results = self.issue_group_command(
command_name="Abort",
proxies=subarrays_to_empty,
check_on_fail={
"obsState": lambda attr_value: attr_value
== ObsState.ABORTED,
},
)
restart_results = self.issue_group_command(
command_name="Restart",
proxies=subarrays_to_empty,
check_on_fail={
"obsState": lambda attr_value: attr_value
== ObsState.EMPTY,
},
)
if any(
result_code != ResultCode.OK
for result_code in list(abort_results.values())
+ list(restart_results.values())
):
task_callback(
result=(ResultCode.FAILED, "Off command failed"),
status=TaskStatus.COMPLETED,
)
return
self._update_component_state(power=PowerState.OFF)
task_callback(
result=(ResultCode.OK, "Off completed OK"),
status=TaskStatus.COMPLETED,
)
# ----------------
# Helper Functions
# ----------------
# --- Resource Status -- #
@property
def resource_status(self) -> str:
"""
Getter Function for resource_status.
Retrieves the ResourceStatus values from available VCC and FSP units and
formats it according to ResourceStatus schema in SKA Telmodel.
:return: A JSON string representation of MCS's Resource Status
:rtype: str
"""
resource_status_dict = {}
resource_status_dict["interface"] = CBF_RESOURCESTATUS_VER1_0
vcc_resource_status_dict = {}
fsp_resource_status_dict = {}
for _, vcc_unit_proxies in (self.proxy_dict[const.VCC_UNIT]).items():
vcc_resource_status = vcc_unit_proxies.resourceStatus
vcc_resource_status_dict.update(orjson.loads(vcc_resource_status))
resource_status_dict["vcc"] = vcc_resource_status_dict
# Enable when FSP Units are implemented
for _, fsp_unit_proxies in (self.proxy_dict[const.FSP_UNIT]).items():
fsp_resource_status = fsp_unit_proxies.resourceStatus
fsp_resource_status_dict.update(orjson.loads(fsp_resource_status))
resource_status_dict["fsp"] = fsp_resource_status_dict
return orjson.dumps(resource_status_dict).decode()