Source code for ska_mid_cbf_mcs.controller.controller_component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.

from __future__ import annotations

from os import environ
from queue import Queue
from threading import Event
from typing import Any, Callable, Dict, List, Optional

import orjson
import tango
from ska_control_model import AdminMode, ObsState, PowerState, TaskStatus
from ska_schemas.schema import validate as schema_validate
from ska_tango_base.commands import ResultCode
from ska_tango_testing import context
from ska_telmodel_client.frontend import TMData

from ska_mid_cbf_mcs.base.component_manager import (
    CbfComponentManager,
    CommunicationStatus,
)
from ska_mid_cbf_mcs.commons.dish_utils import DISHUtils
from ska_mid_cbf_mcs.commons.global_enum import const, param_keys
from ska_mid_cbf_mcs.commons.validate_interface import (
    CBF_RESOURCESTATUS_VER1_0,
    validate_interface,
)
from ska_mid_cbf_mcs.commons.vcc_unit_utils import VCCUnitUtil

__all__ = ["ControllerComponentManager"]


[docs] class ControllerComponentManager(CbfComponentManager): """ A component manager for the CbfController device. """ def __init__( self: ControllerComponentManager, *args: Any, fqdn_dict: Dict[str, List[str]], max_capabilities: Dict[str, int], vcc_id_to_vcc_unit_id_mapping: List[str], **kwargs: Any, ) -> None: """ Initialise a new instance. :param fqdn_dict: dictionary containing FQDNs for the controller's sub-elements :param max_capabilities: dictionary containing maximum number of sub-elements """ super().__init__(*args, **kwargs) self.validate_supported_configuration = True # --- Capabilities --- # self._count_subarray = max_capabilities[const.SUBARRAY] self._count_vcc = max_capabilities[const.VCC] self._count_fsp = max_capabilities[const.FSP] # --- FQDNs --- # # TODO (CIP-3072): for now, controller sets all FHS simulators ONLINE self._fqdn_dict = fqdn_dict self.dish_utils = None self.last_init_sys_param = "" self.source_init_sys_param = "" # device type dictionary self._device_types = [ const.SUBARRAY, const.VCC_UNIT, const.FSP_UNIT, ] # Vcc Unit utils self._vcc_unit_util = VCCUnitUtil(vcc_id_to_vcc_unit_id_mapping) # ------------- # Communication # ------------- # --- Start Communicating --- # def _init_device_proxy( self: ControllerComponentManager, device_type: str, fqdn: str, device_id: int, admin_mode: AdminMode, subscribe_results: bool = False, subscribe_state: bool = False, ) -> bool: """ Initialize the device proxy from its FQDN, store the proxy in the _proxies dictionary, and set the AdminMode to ONLINE :param fqdn: FQDN of the device :param subscribe_results: True if should subscribe to the device's longRunningCommandResult attribute; defaults to False :param subscribe_state: True if should subscribe to the device's state attribute; defaults to False :return: True if the device proxy is successfully initialized, False otherwise. """ proxy = None if device_id not in self.proxy_dict[device_type]: try: self.logger.info(f"Establishing connection to {fqdn}") proxy = context.DeviceProxy(device_name=fqdn) self.proxy_dict[device_type][device_id] = proxy except (tango.DevFailed, Exception) as dev_failed: self.logger.error( f"Failure in connection to {fqdn}: {dev_failed}" ) return False else: proxy = self.proxy_dict[device_type][device_id] if subscribe_results: try: self._event_handler.subscribe_events( proxy=proxy, change_event_callbacks={ "longRunningCommandResult": self.results_callback }, ) except tango.DevFailed as dev_failed: self.logger.error( f"Failed to subscribe to {fqdn}/longRunningCommandResult attribute change events; {dev_failed}" ) return False if subscribe_state: try: self._op_states[fqdn] = None self._event_handler.subscribe_events( proxy=proxy, change_event_callbacks={"state": self.op_state_callback}, ) except tango.DevFailed as dev_failed: self.logger.error( f"Failed to subscribe to {fqdn}/state attribute change events; {dev_failed}" ) return False try: self.logger.info(f"Setting {fqdn} {admin_mode.name}") proxy.adminMode = admin_mode except tango.DevFailed as dev_failed: self.logger.error(f"Failed to set AdminMode: {dev_failed}") return False return True def _init_device_proxies( self: ControllerComponentManager, admin_mode: AdminMode ) -> bool: """ Initialize all device proxies. :return: True if the device proxies are all successfully initialized, False otherwise. """ init_success = True # TODO (CIP-3072): for now, controller sets all FHS simulators ONLINE for device_type in self._device_types: subscribe_results = False if device_type == const.SUBARRAY: subscribe_results = True self.proxy_dict[device_type] = {} for device_id, fqdn in enumerate(self._fqdn_dict[device_type], 1): if not self._init_device_proxy( device_type=device_type, fqdn=fqdn, device_id=device_id, admin_mode=admin_mode, subscribe_results=subscribe_results, ): init_success = False return init_success
[docs] def start_communicating( self: ControllerComponentManager, *args: Any, admin_mode: AdminMode, **kwargs: Any, ) -> None: """ Thread for start_communicating operation. """ self.logger.info( "Entering ControllerComponentManager.start_communicating" ) self._op_states_queue = Queue(maxsize=256) # Initialize device proxies if not self._init_device_proxies(admin_mode=admin_mode): self.logger.error("Failed to initialize proxies.") self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED ) return super().start_communicating(admin_mode=admin_mode) self._update_component_state(power=PowerState.OFF)
[docs] def stop_communicating( self: ControllerComponentManager, *args: Any, admin_mode: AdminMode, **kwargs: Any, ) -> None: """ Thread for stop_communicating operation. """ self.logger.info( f"Entering ControllerComponentManager.stop_communicating: {admin_mode.name}" ) # Order of operations for sub devices: # 1. Set the sub devices to AdminMode.OFFLINE to trigger transition to DevState.DISABLE # 2. Wait for the sub devices to transition to DevState.DISABLE # 3. Unsubscribe from the change events with the sub devices for device_type, proxy_dict in self.proxy_dict.items(): for device_id, proxy in proxy_dict.items(): try: self.logger.info( f"Setting {device_type} {device_id} to {admin_mode.name}" ) proxy.adminMode = admin_mode except tango.DevFailed as dev_failed: self.logger.info( f"Failed to stop communications {admin_mode.name} with {device_type} {device_id}; {dev_failed}" ) continue if admin_mode == AdminMode.OFFLINE: try: self._event_handler.unsubscribe_events() except tango.DevFailed as dev_failed: self.logger.error( f"Failed to unsubscribe from subarray events; {dev_failed}" ) super().stop_communicating(admin_mode=admin_mode)
# -------- # Commands # -------- # --- InitSysParam Command --- # def _validate_init_sys_param( self: ControllerComponentManager, params: str, ) -> tuple[bool, dict | None]: """ Validate the InitSysParam against the ska-schemas schema :param params: The InitSysParam parameters as a JSON-formatted str :return: True if the InitSysParam parameters are valid, False otherwise """ # Validate supported interface passed in the JSON string. # If invalid, validate_interface will return a message instead of the # param dict. (valid, param_dict) = validate_interface(params, "initsysparam") if not valid: self.logger.error(param_dict) return False, None # Validate init_sys_param against the telescope model try: # Workaround to enable strict validation here. # Need to set both PYTEST_VERSION to something and # SKA_SCHEMAS_ALLOW_STRICT_VALIDATION to True pytest_version = environ.get("PYTEST_VERSION") if pytest_version is None: environ["PYTEST_VERSION"] = "7.2.0" environ["SKA_SCHEMAS_ALLOW_STRICT_VALIDATION"] = "True" schema_validate( version=param_dict["interface"], config=param_dict, strictness=2, ) self.logger.info( "InitSysParam validation against ska-schemas schema was successful!" ) except ValueError as value_error: self.logger.error( f"InitSysParam validation against ska-schemas schema failed with exception: {str(value_error)}" ) return False, None return True, param_dict def _retrieve_sys_param_file( self: ControllerComponentManager, init_sys_param_json: Dict[Any, Any], ) -> tuple[bool, Dict[Any, Any]]: """ Retrieve the sys_param file from the Telescope Model :param init_sys_param_json: The InitSysParam parameters """ # The uri was provided in the input string, therefore the mapping from Dish ID to # VCC and frequency offset k needs to be retrieved using the Telescope Model tm_data_sources = init_sys_param_json["tm_data_sources"][0] tm_data_filepath = init_sys_param_json["tm_data_filepath"] try: mid_cbf_param_dict = TMData([tm_data_sources])[ tm_data_filepath ].get_dict() self.logger.info( f"Successfully retrieved json data from {tm_data_filepath} in {tm_data_sources}" ) except (ValueError, KeyError) as error: self.logger.error( f"Retrieving the init_sys_param file failed with exception: {str(error)}" ) return (False, None) return (True, mid_cbf_param_dict) def _update_init_sys_param( self: ControllerComponentManager, params: str, ) -> bool: """ Update the InitSysParam parameters in the subarrays and VCCs :param params: The InitSysParam parameters :return: True if the InitSysParam parameters are successfully updated, False otherwise """ # Write the init_sys_param to each of the subarrays self.logger.debug(f"Updating sysParam: {params}") for device_id, proxy in self.proxy_dict[const.SUBARRAY].items(): try: proxy.sysParam = params except tango.DevFailed as dev_failed: self.logger.error( f"Failure in connection to Subarray {device_id}; {dev_failed}" ) return False return True
[docs] def init_sys_param( self: ControllerComponentManager, sys_params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Validate and save the Dish ID - VCC ID mapping and k values. :param sys_params: the Dish ID - VCC ID mapping and k values in a json string. :param task_callback: Callback function to update task status :param task_abort_event: Event to signal task abort. """ self.logger.debug(f"Received sys params {sys_params}") task_callback(status=TaskStatus.IN_PROGRESS) if self.task_abort_event_is_set( "InitSysParam", task_callback, task_abort_event ): return valid, sys_param_dict = self._validate_init_sys_param(sys_params) if not valid: task_callback( result=( ResultCode.FAILED, "Validating init_sys_param file failed", ), status=TaskStatus.COMPLETED, ) return # If tm_data_filepath is provided, then we need to retrieve the # init sys param file from CAR via the telescope model and re-validate. if "tm_data_filepath" in sys_param_dict: passed, sys_param_dict = self._retrieve_sys_param_file( sys_param_dict ) if not passed: task_callback( result=( ResultCode.FAILED, "Retrieving the init_sys_param file failed", ), status=TaskStatus.COMPLETED, ) return valid, sys_param_dict = self._validate_init_sys_param( orjson.dumps(sys_param_dict).decode() ) if not valid: task_callback( result=( ResultCode.FAILED, "Validating init_sys_param file retrieved from tm_data_filepath against ska-schemas schema failed", ), status=TaskStatus.COMPLETED, ) return self.source_init_sys_param = sys_params self.last_init_sys_param = orjson.dumps(sys_param_dict).decode() else: self.source_init_sys_param = "" self.last_init_sys_param = sys_params # Store the attribute self.dish_utils = DISHUtils(sys_param_dict) # Send init_sys_param to the subarrays and VCCs. if not self._update_init_sys_param(self.last_init_sys_param): self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED ) task_callback( result=( ResultCode.FAILED, "Failed to update subarrays and/or VCCs with init_sys_param", ), status=TaskStatus.COMPLETED, ) return task_callback( result=( ResultCode.OK, "InitSysParam completed OK", ), status=TaskStatus.COMPLETED, ) return
# --- Set Resource Admin Mode Command --- # def _validate_set_resource_admin_mode( self: ControllerComponentManager, params: str, ) -> Dict: """ Validate the SetResourceAdminMode against the ska-schemas schema :param params: The SetResourceAdminMode parameters in a json string. :return: True if the SetResourceAdminMode parameters are valid, False otherwise """ # Validate supported interface passed in the JSON string set_resource_admin_mode_param = {} # If invalid, validate_interface will return a message instead of the # param dict. (valid, set_resource_admin_mode_param) = validate_interface( params, "setresourceadminmode" ) if not valid: self.logger.error(set_resource_admin_mode_param) return None # Validate set_resource_admin_mode against the telescope model try: # Workaround to enable strict validation here. # Need to set both PYTEST_VERSION to something and # SKA_SCHEMAS_ALLOW_STRICT_VALIDATION to True pytest_version = environ.get("PYTEST_VERSION") if pytest_version is None: environ["PYTEST_VERSION"] = "7.2.0" environ["SKA_SCHEMAS_ALLOW_STRICT_VALIDATION"] = "True" schema_validate( version=set_resource_admin_mode_param[param_keys.INTERFACE], config=set_resource_admin_mode_param, strictness=2, ) self.logger.info( "SetResourceAdminMode validation against ska-schemas schema was successful!" ) except ValueError as value_error: self.logger.error( f"SetResourceAdminMode validation against ska-schemas schema failed with exception: {str(value_error)}" ) return None return set_resource_admin_mode_param
[docs] def set_resource_admin_mode( self: ControllerComponentManager, admin_mode_params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Validate and save the resource id and admin mode. :param admin_mode_params: the resource id and admin mode in a json string. :param task_callback: Callback function to update task status :param task_abort_event: Event to signal task abort. """ self.logger.debug(f"Received admin mode params {admin_mode_params}") task_callback(status=TaskStatus.IN_PROGRESS) if self.task_abort_event_is_set( "SetResourceAdminMode", task_callback, task_abort_event ): return set_resource_admin_mode_param = self._validate_set_resource_admin_mode( admin_mode_params ) if set_resource_admin_mode_param is None: task_callback( result=( ResultCode.FAILED, "Validating set_resource_admin_mode against ska-schemas schema failed", ), status=TaskStatus.COMPLETED, ) return # MCS is not checking the state of VCC/FSP Unit before setting the admin mode # because the assumption is that the maintainer or operator are taking # the responsibility to check the state before setting the admin mode transaction_id = set_resource_admin_mode_param.get( param_keys.TRANSACTION_ID ) message = "Processing SetResourceAdminMode command" if transaction_id: message += f" with transaction ID: {transaction_id}" self.logger.info(message) admin_mode = AdminMode[ set_resource_admin_mode_param[param_keys.ADMIN_MODE] ] devices_failed = { const.VCC_UNIT: [], const.FSP_UNIT: [], } def set_admin_mode_for_device_type( device_type: str, device_ids: List[str] ) -> None: for device_id in device_ids: try: device_proxy = self.proxy_dict[device_type][device_id] device_proxy.adminMode = admin_mode except (KeyError, tango.DevFailed) as dev_failed: self.logger.error( f"Unable to set {device_type} {device_id} adminMode to {admin_mode}; {dev_failed}" ) devices_failed[device_type].append(device_id) # vcc_unit_ids is an optional parameter vcc_unit_ids = set_resource_admin_mode_param.get( param_keys.VCC_UNIT_IDS, [] ) set_admin_mode_for_device_type(const.VCC_UNIT, vcc_unit_ids) # fsp_unit_ids is an optional parameter fsp_unit_ids = set_resource_admin_mode_param.get( param_keys.FSP_UNIT_IDS, [] ) set_admin_mode_for_device_type(const.FSP_UNIT, fsp_unit_ids) if devices_failed[const.VCC_UNIT] or devices_failed[const.FSP_UNIT]: self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED ) task_callback( result=( ResultCode.FAILED, f"Failed to update adminMode to {admin_mode} for the following devices: {devices_failed}", ), status=TaskStatus.COMPLETED, ) return # TODO CIP-4089: Setting the admin mode for FSP Units in future story task_callback( result=( ResultCode.OK, "SetResourceAdminMode completed OK", ), status=TaskStatus.COMPLETED, ) return
# --- On Command --- #
[docs] def on( self: ControllerComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Turn on the controller and its subordinate devices. This command will succeed and set controller operational state to ON if at least 1 LRU is successfully powered ON. :param task_callback: Callback function to update task status. :param task_abort_event: Event to signal task abort. """ self.logger.debug("Entering ControllerComponentManager.on") task_callback(status=TaskStatus.IN_PROGRESS) if self.task_abort_event_is_set("On", task_callback, task_abort_event): return # TODO: for AA2 On/Off commands will be removed self._update_component_state(power=PowerState.ON) task_callback( result=(ResultCode.OK, "On completed OK"), status=TaskStatus.COMPLETED, ) return
# --- Off Command --- #
[docs] def off( self: ControllerComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Turn off the controller and its subordinate devices. This command will succeed and set controller operational state to OFF only if the entire subordinate system is returned to OFF state. :param task_callback: Callback function to update task status :param task_abort_event: Event to signal task abort. """ self.logger.debug("Entering ControllerComponentManager.off") task_callback(status=TaskStatus.IN_PROGRESS) if self.task_abort_event_is_set( "Off", task_callback, task_abort_event ): return # If any subarray is not in ObsState.EMPTY, issue Abort and Restart commands. # Off command will fail if any subarray commands fail. subarrays_to_empty = [ subarray for subarray in self.proxy_dict[const.SUBARRAY].values() if subarray.obsState != ObsState.EMPTY ] if len(subarrays_to_empty) > 0: abort_results = self.issue_group_command( command_name="Abort", proxies=subarrays_to_empty, check_on_fail={ "obsState": lambda attr_value: attr_value == ObsState.ABORTED, }, ) restart_results = self.issue_group_command( command_name="Restart", proxies=subarrays_to_empty, check_on_fail={ "obsState": lambda attr_value: attr_value == ObsState.EMPTY, }, ) if any( result_code != ResultCode.OK for result_code in list(abort_results.values()) + list(restart_results.values()) ): task_callback( result=(ResultCode.FAILED, "Off command failed"), status=TaskStatus.COMPLETED, ) return self._update_component_state(power=PowerState.OFF) task_callback( result=(ResultCode.OK, "Off completed OK"), status=TaskStatus.COMPLETED, )
# ---------------- # Helper Functions # ---------------- # --- Resource Status -- # @property def resource_status(self) -> str: """ Getter Function for resource_status. Retrieves the ResourceStatus values from available VCC and FSP units and formats it according to ResourceStatus schema in SKA Telmodel. :return: A JSON string representation of MCS's Resource Status :rtype: str """ resource_status_dict = {} resource_status_dict["interface"] = CBF_RESOURCESTATUS_VER1_0 vcc_resource_status_dict = {} fsp_resource_status_dict = {} for _, vcc_unit_proxies in (self.proxy_dict[const.VCC_UNIT]).items(): vcc_resource_status = vcc_unit_proxies.resourceStatus vcc_resource_status_dict.update(orjson.loads(vcc_resource_status)) resource_status_dict["vcc"] = vcc_resource_status_dict # Enable when FSP Units are implemented for _, fsp_unit_proxies in (self.proxy_dict[const.FSP_UNIT]).items(): fsp_resource_status = fsp_unit_proxies.resourceStatus fsp_resource_status_dict.update(orjson.loads(fsp_resource_status)) resource_status_dict["fsp"] = fsp_resource_status_dict return orjson.dumps(resource_status_dict).decode()