Source code for ska_mid_cbf_mcs.fsp_unit.fsp_unit_component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.

from __future__ import annotations

from typing import Any, List

import orjson

# tango imports
import tango
from ska_control_model import AdminMode, HealthState, ObsMode, PowerState
from ska_tango_testing import context

from ska_mid_cbf_mcs.base.component_manager import (
    CbfComponentManager,
    CommunicationStatus,
)
from ska_mid_cbf_mcs.commons.global_enum import const

__all__ = ["FspUnitComponentManager"]


[docs] class FspUnitComponentManager(CbfComponentManager): """ Component manager for Fsp Unit. """ def __init__( self: FspUnitComponentManager, *args: Any, fsp_fqdn: str, fhs_host_controller_fqdns: List[str], device_id: int, **kwargs: Any, ) -> None: """ Initialize a new instance. :param fsp_fqdn: FQDN of the Fsp device :param fsp_mode_controller_fqdns: FQDN of the FSPModeManagementController devices :param fhs_host_controller_fqdns: FQDNs of the FHSHostController devices """ # supply operating state machine trigger keywords super().__init__(*args, **kwargs) # --- Attribute Values --- # # HealthState of the hardware self.health_state_hw = HealthState.UNKNOWN self._fsp_fqdn = fsp_fqdn self._fhs_host_controller_fqdns = fhs_host_controller_fqdns self._device_id = device_id self.proxy_dict = { const.FHS_HOST_CONTROLLER: {}, const.FSP: {}, } def _create_device_proxy( self: FspUnitComponentManager, ) -> None: """Initialize device proxies.""" self.logger.info(f"Creating proxy to {self._fsp_fqdn}") try: self.proxy_dict[const.FSP] = context.DeviceProxy( device_name=self._fsp_fqdn ) except tango.DevFailed as dev_failed: self.logger.error( f"Failed to connect to {self._fsp_fqdn}; {dev_failed}" ) return False # TODO: FHS host devices are not implemented nor are they simulated # they are not in scope for M1 as they are hardware monitoring and control # Therefore, the connection is temporary commented out # # for device_id, fqdn in enumerate(self._fhs_host_controller_fqdns, 1): # try: # proxy = context.DeviceProxy(device_name=fqdn) # self.proxy_dict[const.FHS_HOST_CONTROLLER][device_id] = proxy # except tango.DevFailed as dev_failed: # self.logger.error(f"Failed to connect to {fqdn}; {dev_failed}") # return False return True
[docs] def start_communicating( self: FspUnitComponentManager, *args: Any, admin_mode: AdminMode, **kwargs: Any, ) -> None: """ Establish communication with the component, then start monitoring. """ self.logger.info( "Entering fsp_unit_component_manager.start_communicating" ) success = self._create_device_proxy() if not success: self.logger.error("Failed to initialize proxies.") self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED ) return try: self.proxy_dict[const.FSP].adminMode = admin_mode except tango.DevFailed as dev_failed: self.logger.error( f"Failed to set {self._fsp_fqdn} to {admin_mode}; {dev_failed}" ) self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED ) return super().start_communicating(admin_mode=admin_mode) self._update_component_state(power=PowerState.ON)
[docs] def stop_communicating( self: FspUnitComponentManager, *args: Any, admin_mode: AdminMode, **kwargs: Any, ) -> None: """ Thread for stop_communicating operation. """ try: self.proxy_dict[const.FSP].adminMode = admin_mode except tango.DevFailed as dev_failed: self.logger.error( f"Failed to set {self._fsp_fqdn} to {admin_mode}; {dev_failed}" ) self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED ) return super().stop_communicating(admin_mode=admin_mode)
def _fhs_host_health_state_callback( self: FspUnitComponentManager, health_state: HealthState, ) -> None: # TODO: Will be implemented in a future story self.logger.info("Handle fhs host health state") def _handle_admin_mode_callback( self: FspUnitComponentManager, admin_mode: AdminMode, ) -> None: # TODO: Will be implemented in a future story self.logger.info("Handle fhs host admin mode") # --- Resource Status -- # @property def resource_status(self) -> str: """ Getter Function for resource_status :return: A JSON string representation of FSP's Resource Status :rtype: str """ return self.get_resource_status()
[docs] def get_resource_status(self: FspUnitComponentManager) -> str: """ Retrieves the the resource status from all available FSP devices and format it into a ResourceStatus VCC Status Object JSON string. Retrieves the following from the FSP devices: * used_by_subarrays: The subarrays that is/are using the FSP * health_state: HealthState of the FSP * admin_mode: AdminMode of the FSP * vcc_unit_id: This VCC Unit's ID :return: a JSON string :rtype: str """ fsp_rs_dict = {} fsp_proxy = self.proxy_dict[const.FSP] fsp_id = int(fsp_proxy.dev_name().split("/")[-1]) fsp_rs_dict[fsp_id] = {} # The attribute return a ndarray of int64, which is not compatible with # orjson.dumps subarray_membership = [int(i) for i in fsp_proxy.subarrayMembership] fsp_rs_dict[fsp_id]["used_by_subarrays"] = subarray_membership fsp_rs_dict[fsp_id]["fsp_mode"] = ObsMode(fsp_proxy.obsMode).name fsp_rs_dict[fsp_id]["health_state"] = HealthState( fsp_proxy.healthState ).name fsp_rs_dict[fsp_id]["admin_mode"] = AdminMode(fsp_proxy.adminMode).name fsp_rs_dict[fsp_id]["fsp_unit_id"] = self._device_id return orjson.dumps( fsp_rs_dict, option=orjson.OPT_NON_STR_KEYS ).decode()