Source code for ska_mid_cbf_mcs.controller.controller_device

# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.

from __future__ import annotations

from typing import Dict, List, cast

from ska_tango_base.long_running_commands import LRCReqType
from ska_tango_base.type_hints import DevVarLongStringArrayType
from ska_tango_base.utils import convert_dict_to_list
from tango import DevFailed, DevState
from tango.server import attribute, command, device_property

from ska_mid_cbf_mcs.base.base_device import CbfDevice
from ska_mid_cbf_mcs.commons.global_enum import const
from ska_mid_cbf_mcs.controller.controller_component_manager import (
    ControllerComponentManager,
)


[docs] class CbfController(CbfDevice): """ CbfController TANGO device class. Primary point of contact for monitoring and control of Mid.CBF. Implements state and mode indicators, and a set of state transition commands. """ # ----------------------------------------------------------------------- # # Attributes # # ----------------------------------------------------------------------- # # --- Device Properties --- # SubarrayFQDNs = device_property( doc="Fully Qualified Domain Names (FQDNs) of the MCS CbfSubarray devices.", dtype=[str], mandatory=True, ) VccUnitFQDNs = device_property( doc="Fully Qualified Domain Names (FQDNs) of the MCS VccUnit devices.", dtype=[str], mandatory=True, ) FspUnitFQDNs = device_property( doc="Fully Qualified Domain Names (FQDNs) of the MCS FspUnit devices.", dtype=[str], mandatory=True, ) MaxCapabilities = device_property( doc="Maximum Mid.CBF resource capabilities, including subarrays, VCCs and FSPs.", dtype=[str], mandatory=True, ) VccIDToVccUnitIDMapping = device_property( doc="Mapping of VCC IDs to VCC Unit IDs.", dtype=[str], mandatory=True, ) # ----------------------------------------------------------------------- # # Methods # # ----------------------------------------------------------------------- # # --- Device Attributes --- # @attribute( dtype=str, doc="Maps Dish ID to VCC and frequency offset k. The string is in JSON format.", ) def sysParam(self: CbfController) -> str: """ :return: the mapping from Dish ID to VCC and frequency offset k. The string is in JSON format. :rtype: str """ return self.component_manager.last_init_sys_param @attribute( dtype=str, doc="Source and file path to the file to be retrieved through the Telescope Model. The string is in JSON format.", ) def sourceSysParam(self: CbfController) -> str: """ :return: the location of the json file that contains the mapping from Dish ID to VCC and frequency offset k, to be retrieved using the Telescope Model. :rtype: str """ return self.component_manager.source_init_sys_param @attribute( dtype=[str], doc="Dish ID to VCC mapping. The string is in the format 'dishID:vccID'.", max_dim_x=const.MAX_VCC, ) def dishToVcc(self: CbfController) -> List[str]: """ Return dishToVcc attribute: 'dishID:vccID' """ if self.component_manager.dish_utils is None: return [] out_str = [ f"{r}:{v}" for r, v in self.component_manager.dish_utils.dish_id_to_vcc_id.items() ] return out_str @attribute( dtype=[str], doc="VCC to Dish mapping. The string is in the format 'vccID:dishID'.", max_dim_x=const.MAX_VCC, ) def vccToDish(self: CbfController) -> List[str]: """ Return dishToVcc attribute: 'vccID:dishID' """ if self.component_manager.dish_utils is None: return [] out_str = [ f"{v}:{r}" for r, v in self.component_manager.dish_utils.dish_id_to_vcc_id.items() ] return out_str @attribute( dtype=[str], max_dim_x=4, doc=( "Maximum number of instances of each capability type," " e.g. 'VCC:4', 'FSP:4', 'Subarray:1'." ), ) def maxCapabilities(self: CbfController) -> List[str]: """ Read maximum number of instances of each capability type, as defined in charts. By default, these include VCC, FSP, and Subarray. :return: list of maximum number of instances of each capability type """ return convert_dict_to_list(self._max_capabilities) @attribute( dtype=bool, doc="Flag to indicate if a restrictive validation is requested for " "supported configurations, such as with Scan Configurations. " "Defaults to True. Setting the flag to False can cause unexpected " "behaviour with the system.", ) def validateSupportedConfiguration(self: CbfController) -> bool: """ Reads and return the value in the validateSupportedConfiguration :return: the value in validateSupportedConfiguration :rtype: bool """ return self.component_manager.validate_supported_configuration @validateSupportedConfiguration.write def validateSupportedConfiguration( self: CbfController, value: bool ) -> None: """ Setting this flag to false allows the user to bypass validation that ensures the inputs are currently supported by the system. The option to bypass validation is intended for testing purposes only. Using unsupported configurations may result in unexpected system behaviour." A warning level log is created when the flag is set to False. :param value: Set the flag to True/False """ if value is False: msg = ( "Setting validateSupportedConfiguration to False. " "Using unsupported configurations may result in " "unexpected system behaviour." ) self.logger.warning(msg) self.component_manager.validate_supported_configuration = value @attribute( dtype=str, label="ResourceStatus", doc=" JSON string that contains ResourceStatus for all the FSP unit" " and VCC units available to the controller. The JSON string will be" " formatted according to the ResourceStatus schema" " found in SKA Telmodel.", ) def resourceStatus(self: CbfController) -> str: """ Reads and return the ResourceStatus JSON string value. The JSON string will be formatted according to the ResourceStatus schema found in SKA Telmodel. :return: Controller's ResourceStatus as a JSON string :rtype: str """ return self.component_manager.resource_status # --- Device Commands --- #
[docs] def is_On_allowed( self: CbfController, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ ) -> bool: """ Check if On is allowed. :param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked. :return: True if allowed, else False. """ if request_type is LRCReqType.ENQUEUE_REQ: return True self.logger.debug("Checking if On is allowed") if not self.component_manager.is_communicating: return False if self.component_manager.dish_utils is None: self.logger.warning( "On not allowed; Dish-VCC mapping has not been provided." ) return False return True
[docs] @command( dtype_out="DevVarLongStringArray", ) def On(self: CbfController) -> DevVarLongStringArrayType: """ Set Mid.CBF subarrays to AdminMode.ONLINE :return: tuple containing a return code and a unique command identifier """ return self.submit_long_running_command( command_name="On", task=self.component_manager.on, )
[docs] def is_Off_allowed( self: CbfController, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ ) -> bool: """ Check if Off is allowed. :param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked. :return: True if allowed, else False. """ if request_type is LRCReqType.ENQUEUE_REQ: return True self.logger.debug("Checking if Off is allowed") if not self.component_manager.is_communicating: return False return True
[docs] @command( dtype_out="DevVarLongStringArray", ) def Off(self: CbfController) -> DevVarLongStringArrayType: """ Set Mid.CBF subarrays to AdminMode.ONLINE :return: tuple containing a return code and a unique command identifier """ return self.submit_long_running_command( command_name="Off", task=self.component_manager.off, )
[docs] def is_InitSysParam_allowed( self: CbfController, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ ) -> bool: """ Check if InitSysParam is allowed. :param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked. :return: True if allowed, else False. """ if request_type is LRCReqType.ENQUEUE_REQ: return True self.logger.debug("Checking if InitSysParam is allowed") if not self.component_manager.is_communicating: return False if self.get_state() not in [DevState.OFF]: self.logger.error( f"InitSysParam only allowed in DevState.OFF; current state: {self.get_state()}" ) return False return True
[docs] @command( dtype_in="DevString", dtype_out="DevVarLongStringArray", doc_in="the Dish ID - VCC ID mapping and frequency offset (k) in a json string", doc_out="tuple containing a return code and a string message indicating the status of the command.", ) def InitSysParam( self: CbfController, sys_params: str ) -> DevVarLongStringArrayType: """ This command sets the Dish ID - VCC ID mapping and k values :param sys_params: the Dish ID - VCC ID mapping and frequency offset (k) in a json string. :return: tuple containing a return code and a unique command identifier """ return self.submit_long_running_command( command_name="InitSysParam", task=self.component_manager.init_sys_param, kwargs={"sys_params": sys_params}, )
[docs] def is_SetResourceAdminMode_allowed( self: CbfController, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ ) -> bool: """ Check if SetResourceAdminMode is allowed. :param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked. :return: True if allowed, else False. """ if request_type is LRCReqType.ENQUEUE_REQ: return True self.logger.debug("Checking if SetResourceAdminMode is allowed") if not self.component_manager.is_communicating: return False if self.get_state() not in [DevState.OFF]: self.logger.error( f"SetResourceAdminMode only allowed in DevState.OFF; current state: {self.get_state()}" ) return False return True
[docs] @command( dtype_in="DevString", dtype_out="DevVarLongStringArray", doc_in="the admin mode and the resource id (i.e. FSP and VCC Unit id) in a json string", doc_out="tuple containing a return code and a string message indicating the status of the command.", ) def SetResourceAdminMode( self: CbfController, admin_mode_params: str ) -> DevVarLongStringArrayType: """ This command sets the admin mode of the specified FSP and VCC Unit :param admin_mode_params: the admin mode and the resource id (i.e. FSP and VCC Unit id) in a json string :return: tuple containing a return code and a unique command identifier """ return self.submit_long_running_command( command_name="SetResourceAdminMode", task=self.component_manager.set_resource_admin_mode, kwargs={"admin_mode_params": admin_mode_params}, )
# --- Device Initialization --- # def _get_max_capabilities(self: CbfController) -> Dict[str, int]: """ Get maximum number of capabilities for VCC, FSP and Subarray. If property not found in db, then assign a default amount. :return: dictionary of maximum number of capabilities with capability type as key and max capability instances as value """ capabilities = [const.VCC, const.FSP, const.SUBARRAY, const.VCC_UNIT] max_capabilities = {} if self.MaxCapabilities: for max_capability in self.MaxCapabilities: ( capability_type, max_capability_instances, ) = max_capability.split(":") max_capabilities[capability_type] = int( max_capability_instances ) for capability in capabilities: if capability not in max_capabilities: raise DevFailed( f"{capability} capabilities not defined; MaxCapabilities device property must be updated in charts" ) else: raise DevFailed("MaxCapabilities device property not defined") return max_capabilities
[docs] def create_component_manager( self: CbfController, ) -> ControllerComponentManager: """ Create and return a component manager for this device. :return: a component manager for this device. """ fqdn_dict = { const.SUBARRAY: list(self.SubarrayFQDNs), const.VCC_UNIT: list(self.VccUnitFQDNs), const.FSP_UNIT: list(self.FspUnitFQDNs), } # Initialize the _max_capabilities variable for the component manager. self._max_capabilities = self._get_max_capabilities() return ControllerComponentManager( device=self, fqdn_dict=fqdn_dict, max_capabilities=self._max_capabilities, vcc_id_to_vcc_unit_id_mapping=self.VccIDToVccUnitIDMapping, logger=self.logger, health_state_callback=self._update_health_state, communication_state_callback=self._communication_state_changed, component_state_callback=self._component_state_changed, admin_mode_callback=self._admin_mode_perform_action, )
# --- Run Device Server --- # def main(*args: str, **kwargs: str) -> int: """ Entry point for module. :param args: positional arguments :param kwargs: keyword arguments :return: exit code """ return cast(int, CbfController.run_server(args=args or None, **kwargs)) if __name__ == "__main__": main()