Source code for ska_mid_cbf_mcs.vcc_unit.vcc_unit_component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.

from __future__ import annotations

from functools import partial
from typing import Any, List

import orjson

# tango imports
import tango
from ska_control_model import AdminMode, HealthState, PowerState
from ska_mid_cbf_common import HealthMonitorComponent, PolicyConfig
from ska_tango_testing import context

from ska_mid_cbf_mcs.base.component_manager import (
    CbfComponentManager,
    CommunicationStatus,
)
from ska_mid_cbf_mcs.commons.global_enum import const
from ska_mid_cbf_mcs.vcc_unit.vcc_unit_healthstate_policy import (
    VCC_ALL_BANDS_CONTROLLER_TEMPLATE,
    VCC_HARDWARE_ATTR,
    VCC_RESOURCE_TEMPLATE,
    VccHardwareAggregationPolicy,
    VccResourceAggregationPolicy,
    VccUnitAggregationPolicy,
)

__all__ = ["VccUnitComponentManager"]


[docs] class VccUnitComponentManager(CbfComponentManager): """ Component manager for Vcc Unit. """ def __init__( self: VccUnitComponentManager, *args: Any, mid_cbf_controller_fqdn: str, fhs_monitor_fqdn: str, vcc_all_bands_fqdns: List[str], device_id: int, **kwargs: Any, ) -> None: """ Initialize a new instance. :param mid_cbf_controller_fqdn: FQDN of the mid CBF controller :param fhs_host_controller_fqdn: FQDN of the FHS host controller :param vcc_all_bands_fqdns: FQDNs of VCC band devices :param device_id: the device id of the MCS VCC Device """ # supply operating state machine trigger keywords super().__init__(*args, **kwargs) # --- Attribute Values --- # # HealthState of the hardware self._health_state_hw = HealthState.UNKNOWN # HealthState of the VCC which aggregates the Signal processing and # hardware HealthState self._health_states_vccs = {} self._num_vcc_per_unit = len(vcc_all_bands_fqdns) for index in range(1, self._num_vcc_per_unit + 1): self._health_states_vccs[index] = HealthState.UNKNOWN self._mid_cbf_controller_fqdn = mid_cbf_controller_fqdn self._fhs_monitor_fqdn = fhs_monitor_fqdn self._vcc_all_bands_fqdns = vcc_all_bands_fqdns self._device_id = device_id # Setup the Health Monitor Component self._fqdn = self._device.get_name() self._hmc_vcc_unit = self._setup_vcc_unit_health_monitor_component() self._health_state_vccs = {} def _create_device_proxy( self: VccUnitComponentManager, ) -> None: # This function should be called to setup the proxy when the communication # is set online try: self.proxy_dict["controller"] = context.DeviceProxy( device_name=self._mid_cbf_controller_fqdn, ) self.proxy_dict[const.FHS_MONITOR] = context.DeviceProxy( device_name=self._fhs_monitor_fqdn ) self.proxy_dict[const.VCC_ALL_BANDS] = {} self._hmc_vcc_unit.subscribe_to_events( proxy=self.proxy_dict[const.FHS_MONITOR], attr_name="healthState", expected_enum_type=HealthState, ) # Note: There is a distinction between vcc_seq here and the VCC ID of the device. # - vcc_seq is the sequence index relative to the specific VCC Unit # such that each VCC Unit has VCC 1-6 # - VCC ID is absolute ID value of a VCC device within Mid.CBF ranging from 1-197 # Example: VCC Unit 2's vcc_seq 1 could contain VCC with a VCC_ID of 7 for vcc_seq, fqdn in enumerate(self._vcc_all_bands_fqdns, 1): # First create the proxy self.proxy_dict[const.VCC_ALL_BANDS][vcc_seq] = ( context.DeviceProxy( device_name=fqdn, ) ) # Then subscribe to the HealthState of the device self._hmc_vcc_unit.subscribe_to_events( proxy=self.proxy_dict[const.VCC_ALL_BANDS][vcc_seq], attr_name="healthState", expected_enum_type=HealthState, ) except tango.DevFailed as dev_failed: self.logger.error(f"{dev_failed}") return False return True
[docs] def start_communicating( self: VccUnitComponentManager, *args: Any, admin_mode: AdminMode, **kwargs: Any, ) -> None: """ Establish communication with the component, then start monitoring. """ # No need to re-create proxies if we already created them previously if self.is_communicating: self.logger.info( "Skipped creating proxies as they are already created" ) success = True else: success = self._create_device_proxy() self.logger.info("Created proxies") # Update healthStatesVcc attribute signal self._device.health_states_vcc_signal = orjson.dumps( self._health_states_vccs, option=orjson.OPT_NON_STR_KEYS ).decode() if not success: self.logger.error("Failed to initialize proxies.") self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED ) return if admin_mode == AdminMode.ENGINEERING: for _, proxy in self.proxy_dict[const.VCC_ALL_BANDS].items(): subarray_id = proxy.subarrayID if subarray_id != 0: vcc_seq = int(proxy.dev_name().split("/")[-1]) self.logger.error( f"Failed to set the admin mode to {admin_mode.name}. VCC {vcc_seq} is assigned to subarray {subarray_id}." ) return super().start_communicating(admin_mode=admin_mode) self._update_component_state(power=PowerState.ON)
[docs] def stop_communicating( self: VccUnitComponentManager, *args: Any, admin_mode: AdminMode, **kwargs: Any, ) -> None: """ Thread for stop_communicating operation. """ self._unsubscribe_all_events() super().stop_communicating(admin_mode=admin_mode)
def _unsubscribe_all_events(self): """ Unsubscribe from all events the VCC Unit has subscribed to previously """ self._hmc_vcc_unit.unsubscribe_from_events( proxy=self.proxy_dict[const.FHS_MONITOR], attr_names=["healthState"], ) for vcc_seq, _ in enumerate(self._vcc_all_bands_fqdns, 1): self._hmc_vcc_unit.unsubscribe_from_events( proxy=self.proxy_dict[const.VCC_ALL_BANDS][vcc_seq], attr_names=["healthState"], ) def _handle_vcc_all_bands_health_state_callback( self: VccUnitComponentManager, vcc_all_bands_id: int, health_state: HealthState, ) -> None: # TODO: Will be implemented in a future story as part of CIP-3599 self.logger.info("Handle VCC all bands health state") def _fhs_host_health_state_callback( self: VccUnitComponentManager, health_state: HealthState, ) -> None: # TODO: Will be implemented in a future story as part of CIP-3599 self.logger.info("Handle fhs host health state") def _handle_admin_mode_callback( self: VccUnitComponentManager, admin_mode: AdminMode, ) -> None: # TODO: Will be implemented in a future story as part of CIP-3599 self.logger.info("Handle admin mode") # --- Resource Status -- # @property def resource_status(self) -> str: """ Getter Function for resource_status When the variable is called, updates _resource_status and return its value :return: A JSON string representation of VCC's Resource Status :rtype: str """ return self.get_resource_status()
[docs] def get_resource_status(self: VccUnitComponentManager) -> str: """ Retrieves the the resource status from all available VCC devices and format it into a ResourceStatus VCC Status Object JSON string. Retrieves the following from the VCC devices: * used_by_subarray: The subarray that is using the VCC * health_state: HealthState of the VCC * admin_mode: AdminMode of the VCC * vcc_unit_id: This VCC Unit's ID :param vcc_unit_id: Device ID of the this VCC Unit :return: a JSON string :rtype: str """ vcc_rs_dict = {} for _, vcc_proxy in (self.proxy_dict[const.VCC_ALL_BANDS]).items(): vcc_id = int(vcc_proxy.dev_name().split("/")[-1]) vcc_rs_dict[vcc_id] = {} used_by_subarray = [] # For Unit and Integration testing: # We need to store sub_id in a variable since the sim override value # can only be retrieved once. sub_id = vcc_proxy.subarrayID # By default, FHS VCC sets subarrayID to 0 if the VCC is not used by # any subarrays. 0 is an invalid value for resourceStatus schema. if sub_id != 0: used_by_subarray.append(sub_id) vcc_rs_dict[vcc_id]["used_by_subarray"] = used_by_subarray vcc_rs_dict[vcc_id]["health_state"] = HealthState( vcc_proxy.healthState ).name vcc_rs_dict[vcc_id]["admin_mode"] = AdminMode( vcc_proxy.adminMode ).name vcc_rs_dict[vcc_id]["vcc_unit_id"] = self._device_id return orjson.dumps( vcc_rs_dict, option=orjson.OPT_NON_STR_KEYS ).decode()
# Health Monitor Component # --- VCC Unit Aggregate Health Monitor Component --- # def _vcc_signal_path_healthstate_attr(self) -> list[str]: """ Generate a list of all VCC Allbands Controller's HealthState to be monitor by HMC """ attr_list = [] for vcc_seq in range(1, self._num_vcc_per_unit + 1): attr_name = VCC_ALL_BANDS_CONTROLLER_TEMPLATE.format( vcc_seq=vcc_seq ) attr_list.append(attr_name) return attr_list # TODO: Curently this is only returning the internal_state. # ADR-128 decision will change this to have a detail report on why a device # failed def _vcc_unit_aggregate_health_state_info_callback( self, health_info: tango.DevString ): if health_info is None: self._device.health_info_signal = "" else: self._device.health_info_signal = health_info def _vcc_unit_health_state_policy(self) -> str: """ Generate all the policies required for the VCC Unit HMC Note that the sequence we add to the health_state_policies list determines the order that the policies will be processed """ health_state_policies = [] health_state_policies += self._gen_vcc_hardware_health_state_policy() health_state_policies += self._gen_vcc_resource_health_state_policy() health_state_policies += self._gen_vcc_unit_agg_health_state_policy() return health_state_policies def _setup_vcc_unit_health_monitor_component( self, ) -> HealthMonitorComponent: return HealthMonitorComponent( fqdn=self._fqdn, policies=self._vcc_unit_health_state_policy(), update_health_info_callback=self._vcc_unit_aggregate_health_state_info_callback, update_health_state_callback=self._push_health_state_update, logger=self.logger, ) # --- VCC Hardware Aggregate Health Monitor Component --- # def _gen_vcc_hardware_health_state_policy(self) -> list[PolicyConfig]: """ Generate the health state policy for VCC Hardware Health State Policy """ health_state_policies = [ # No Members given. # VccHardwareAggregationPolicy hardcoded to look for FHS Monitor and Network Switch HealthState # in the the internal_state PolicyConfig( name=VCC_HARDWARE_ATTR, members=[], policy=VccHardwareAggregationPolicy( self.logger, self._fqdn, fhs_monitor_fqdn=self._fhs_monitor_fqdn, network_switch_fqdn="", # TODO Add the Network Switch FQDN when defined ), callback=self._update_vcc_unit_hw_health_state, ), ] return health_state_policies def _gen_vcc_unit_agg_health_state_policy(self) -> list[PolicyConfig]: """ Generate the health state policy for VCC Aggregate Health State Policy """ health_state_policies = [ # VccUnitAggregationPolicy also hardcoded to look for VCC HW HealthState in the internal_state PolicyConfig( name="vccUnitAggregateHealthState", members=self._vcc_signal_path_healthstate_attr(), policy=VccUnitAggregationPolicy(self.logger, self._fqdn), ), ] return health_state_policies def _gen_vcc_resource_health_state_policy( self: VccUnitComponentManager, ) -> list[PolicyConfig]: """ Generate the health state policy for VCC Resource Health State Policy """ health_state_policies = [] for vcc_seq in range(1, self._num_vcc_per_unit + 1): health_state_policies.append( # VccResourceAggregationPolicy is also hardcoded to look # for a the VCC HW HealthState in the internal_state. PolicyConfig( name=VCC_RESOURCE_TEMPLATE.format(vcc_seq=vcc_seq), members=[ VCC_ALL_BANDS_CONTROLLER_TEMPLATE.format( vcc_seq=vcc_seq ) ], policy=VccResourceAggregationPolicy( self.logger, self._fqdn ), callback=partial( self._update_vcc_resource_state, vcc_seq=vcc_seq ), ) ) return health_state_policies def _update_vcc_unit_hw_health_state( self: VccUnitComponentManager, health_state: HealthState ) -> None: """ Push a health state update to the device. :param health_state: the new health state of the component manager. """ self._health_state_vccs[VCC_HARDWARE_ATTR] = health_state.name self._device.health_state_hw_signal = health_state def _update_vcc_resource_state( self: VccUnitComponentManager, health_state: HealthState, vcc_seq: int ) -> None: """ Push a health state update to the device. :param health_state: the new health state of the component manager. :param vcc_seq: The sequence index (relative to this VCC Unit) of the VCC Resource for this callback """ self._health_state_vccs[ VCC_RESOURCE_TEMPLATE.format(vcc_seq=vcc_seq) ] = health_state.name