Source code for ska_mid_cbf_mcs.fsp_unit.fsp_unit_component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.

from __future__ import annotations

import traceback
from typing import Any, List

import orjson

# tango imports
import tango
from ska_control_model import AdminMode, HealthState, ObsMode, PowerState
from ska_mid_cbf_common.health.health_monitor_component import (
    HealthMonitorComponent,
)
from ska_mid_cbf_common.health.policy_configurations import PolicyConfig
from ska_tango_testing import context

from ska_mid_cbf_mcs.base.component_manager import (
    CbfComponentManager,
    CommunicationStatus,
)
from ska_mid_cbf_mcs.commons.global_enum import const
from ska_mid_cbf_mcs.fsp_unit.fsp_unit_health_policies import (
    FHS_MONITOR_TEMPLATE,
    FSP_HW_POLICY_NAME,
    FspHardwareAggregationPolicy,
    FspUnitAggregationPolicy,
)

__all__ = ["FspUnitComponentManager"]


[docs] class FspUnitComponentManager(CbfComponentManager): """ Component manager for Fsp Unit. """ def __init__( self: FspUnitComponentManager, *args: Any, fsp_fqdn: str, fhs_monitor_fqdns: List[str], device_id: int, **kwargs: Any, ) -> None: """ Initialize a new instance. :param fsp_fqdn: FQDN of the Fsp device :param fsp_mode_controller_fqdns: FQDN of the FSPModeManagementController devices :param fhs_monitor_fqdns: FQDNs of the FHSMonitor devices """ # supply operating state machine trigger keywords super().__init__(*args, **kwargs) # --- Attribute Values --- # self._fsp_fqdn = fsp_fqdn self._fhs_monitor_fqdns = fhs_monitor_fqdns self.fqdn = self._device.get_name() self._device_id = device_id self.proxy_dict = { const.FHS_MONITOR: {}, const.FSP: {}, } # HealthState self.health_state_hw = HealthState.UNKNOWN self.health_state_fsp = HealthState.UNKNOWN self.num_fhs_controllers_per_unit = len(self._fhs_monitor_fqdns) # Health Monitor Component self.health_monitor = HealthMonitorComponent( fqdn=self.fqdn, policies=self._fsp_unit_health_state_policy(), update_health_info_callback=self.device_health_state_callback, update_health_state_callback=self._push_health_state_update, logger=self.logger, ) self.logger.info("FSP Unit Init complete") def _create_device_proxy( self: FspUnitComponentManager, ) -> None: """Initialize device proxies.""" self.logger.info(f"Creating proxy to {self._fsp_fqdn}") try: self.proxy_dict[const.FSP] = context.DeviceProxy( device_name=self._fsp_fqdn ) self.logger.info(f"Created proxy for {self._fsp_fqdn}") # Then subscribe to the HealthState of the device self.health_monitor.subscribe_to_events( proxy=self.proxy_dict[const.FSP], attr_name="healthState", expected_enum_type=HealthState, ) self.logger.info(f"Subscribed to HMC for {self._fsp_fqdn}") except tango.DevFailed as dev_failed: self.logger.error( f"Failed to connect to {self._fsp_fqdn}; {dev_failed}" ) return False except Exception as exception: self.logger.error(f"{traceback.format_exc()}: {exception}") return False for device_id, fqdn in enumerate(self._fhs_monitor_fqdns, 1): try: self.proxy_dict[const.FHS_MONITOR][device_id] = ( context.DeviceProxy( device_name=fqdn, ) ) self.logger.info(f"Created proxy for {fqdn}") # Then subscribe to the HealthState of the device self.health_monitor.subscribe_to_events( proxy=self.proxy_dict[const.FHS_MONITOR][device_id], attr_name="healthState", expected_enum_type=HealthState, ) self.logger.info(f"Subscribed to HMC for {fqdn}") except tango.DevFailed as dev_failed: self.logger.error(f"Failed to connect to {fqdn}; {dev_failed}") return False except Exception as exception: self.logger.error(f"{traceback.format_exc()}: {exception}") return False return True
[docs] def start_communicating( self: FspUnitComponentManager, *args: Any, admin_mode: AdminMode, **kwargs: Any, ) -> None: """ Establish communication with the component, then start monitoring. """ self.logger.info( "Entering fsp_unit_component_manager.start_communicating" ) # No need to re-create proxies if we already created them previously if self.is_communicating: self.logger.info( "Skipped creating proxies as they are already created" ) success = True else: success = self._create_device_proxy() self.logger.info("Created proxies") if not success: self.logger.error("Failed to initialize proxies.") self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED ) return try: self.proxy_dict[const.FSP].adminMode = admin_mode except tango.DevFailed as dev_failed: self.logger.error( f"Failed to set {self._fsp_fqdn} to {admin_mode}; {dev_failed}" ) self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED ) return except Exception as exception: self.logger.error(f"{traceback.format_exc()}: {exception}") return super().start_communicating(admin_mode=admin_mode) self._update_component_state(power=PowerState.ON) self.logger.info("FSP Unit Start communication complete")
[docs] def stop_communicating( self: FspUnitComponentManager, *args: Any, admin_mode: AdminMode, **kwargs: Any, ) -> None: """ Thread for stop_communicating operation. """ self.logger.info( "Entering fsp_unit_component_manager.stop_communicating" ) try: self.health_monitor.unsubscribe_from_events( proxy=self.proxy_dict[const.FSP], attr_names=["healthState"] ) self.proxy_dict[const.FSP].adminMode = admin_mode except tango.DevFailed as dev_failed: self.logger.error( f"Failed to set {self._fsp_fqdn} to {admin_mode}; {dev_failed}" ) self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED ) return except Exception as exception: self.logger.error(f"{traceback.format_exc()}: {exception}") return False try: for proxy in self.proxy_dict[const.FHS_MONITOR].values(): self.health_monitor.unsubscribe_from_events( proxy=proxy, attr_names=["healthState"], ) except tango.DevFailed as dev_failed: self.logger.error( f"Failed to unsubscribe healthState from {const.FHS_MONITOR}" ) self.logger.error(f"Dev Exception: {dev_failed}") self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED ) return except Exception as exception: self.logger.error(f"{traceback.format_exc()}: {exception}") return False super().stop_communicating(admin_mode=admin_mode) self.logger.info("FSP Unit Stop communication complete")
def _handle_admin_mode_callback( self: FspUnitComponentManager, admin_mode: AdminMode, ) -> None: # TODO: Will be implemented in a future story self.logger.info("Handle fhs host admin mode") # --- Health Status -- # def _fsp_unit_health_state_policy(self) -> str: """ Parse the vcc unit health state policy file and return a serialized json that will be intake by HMC """ health_state_policies = [] hw_members = [] for fhs_seq in range(1, self.num_fhs_controllers_per_unit + 1): hw_members.append(FHS_MONITOR_TEMPLATE.format(fhs_seq=fhs_seq)) health_state_policies.append( PolicyConfig( name=FSP_HW_POLICY_NAME, members=hw_members, policy=FspHardwareAggregationPolicy( self.logger, fhs_monitor_fqdns=self._fhs_monitor_fqdns, network_switch_fqdn="", ), # TODO Networkswitch callback=self._fsp_unit_hw_health_state_update, ) ) health_state_policies.append( PolicyConfig( name="fspUnitAggregateHealthState", members=[], policy=FspUnitAggregationPolicy( logger=self.logger, fqdn=self.fqdn, fsp_fqdn=self._fsp_fqdn ), ) ) return health_state_policies
[docs] def device_health_state_callback(self, health_info: tango.DevString): if health_info is None: self._device.health_info_signal = "" else: self._device.health_info_signal = health_info # Set HW to OK for now until we can integrate it self._device.health_state_hw_signal = HealthState.OK
def _fsp_resource_state_update( self: FspUnitComponentManager, health_state: HealthState ) -> None: """ Push a health state update to the device. :param health_state: the new health state of the component manager. """ self.health_state_fsp = health_state.name def _fsp_unit_hw_health_state_update( self: FspUnitComponentManager, health_state: HealthState ) -> None: """ Push a health state update to the device. :param health_state: the new health state of the component manager. :param fhs_seq: The sequence index (relative to this FHS Monitor) of the FSP Unit for this callback """ self.health_state_hw = health_state.name # --- Resource Status -- # @property def resource_status(self) -> str: """ Getter Function for resource_status :return: A JSON string representation of FSP's Resource Status :rtype: str """ return self.get_resource_status()
[docs] def get_resource_status(self: FspUnitComponentManager) -> str: """ Retrieves the the resource status from all available FSP devices and format it into a ResourceStatus VCC Status Object JSON string. Retrieves the following from the FSP devices: * used_by_subarrays: The subarrays that is/are using the FSP * health_state: HealthState of the FSP * admin_mode: AdminMode of the FSP * vcc_unit_id: This VCC Unit's ID :return: a JSON string :rtype: str """ fsp_rs_dict = {} fsp_proxy = self.proxy_dict[const.FSP] fsp_id = int(fsp_proxy.dev_name().split("/")[-1]) fsp_rs_dict[fsp_id] = {} # The attribute return a ndarray of int64, which is not compatible with # orjson.dumps subarray_membership = [int(i) for i in fsp_proxy.subarrayMembership] fsp_rs_dict[fsp_id]["used_by_subarrays"] = subarray_membership fsp_rs_dict[fsp_id]["fsp_mode"] = ObsMode(fsp_proxy.obsMode).name fsp_rs_dict[fsp_id]["health_state"] = HealthState( fsp_proxy.healthState ).name fsp_rs_dict[fsp_id]["admin_mode"] = AdminMode(fsp_proxy.adminMode).name fsp_rs_dict[fsp_id]["fsp_unit_id"] = self._device_id return orjson.dumps( fsp_rs_dict, option=orjson.OPT_NON_STR_KEYS ).decode()