# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.
from __future__ import annotations
import traceback
from typing import Any, List
import orjson
# tango imports
import tango
from ska_control_model import AdminMode, HealthState, ObsMode, PowerState
from ska_mid_cbf_common.health.health_monitor_component import (
HealthMonitorComponent,
)
from ska_mid_cbf_common.health.policy_configurations import PolicyConfig
from ska_tango_testing import context
from ska_mid_cbf_mcs.base.component_manager import (
CbfComponentManager,
CommunicationStatus,
)
from ska_mid_cbf_mcs.commons.global_enum import const
from ska_mid_cbf_mcs.fsp_unit.fsp_unit_health_policies import (
FHS_MONITOR_TEMPLATE,
FSP_HW_POLICY_NAME,
FspHardwareAggregationPolicy,
FspUnitAggregationPolicy,
)
__all__ = ["FspUnitComponentManager"]
[docs]
class FspUnitComponentManager(CbfComponentManager):
"""
Component manager for Fsp Unit.
"""
def __init__(
self: FspUnitComponentManager,
*args: Any,
fsp_fqdn: str,
fhs_monitor_fqdns: List[str],
device_id: int,
**kwargs: Any,
) -> None:
"""
Initialize a new instance.
:param fsp_fqdn: FQDN of the Fsp device
:param fsp_mode_controller_fqdns: FQDN of the FSPModeManagementController devices
:param fhs_monitor_fqdns: FQDNs of the FHSMonitor devices
"""
# supply operating state machine trigger keywords
super().__init__(*args, **kwargs)
# --- Attribute Values --- #
self._fsp_fqdn = fsp_fqdn
self._fhs_monitor_fqdns = fhs_monitor_fqdns
self.fqdn = self._device.get_name()
self._device_id = device_id
self.proxy_dict = {
const.FHS_MONITOR: {},
const.FSP: {},
}
# HealthState
self.health_state_hw = HealthState.UNKNOWN
self.health_state_fsp = HealthState.UNKNOWN
self.num_fhs_controllers_per_unit = len(self._fhs_monitor_fqdns)
# Health Monitor Component
self.health_monitor = HealthMonitorComponent(
fqdn=self.fqdn,
policies=self._fsp_unit_health_state_policy(),
update_health_info_callback=self.device_health_state_callback,
update_health_state_callback=self._push_health_state_update,
logger=self.logger,
)
self.logger.info("FSP Unit Init complete")
def _create_device_proxy(
self: FspUnitComponentManager,
) -> None:
"""Initialize device proxies."""
self.logger.info(f"Creating proxy to {self._fsp_fqdn}")
try:
self.proxy_dict[const.FSP] = context.DeviceProxy(
device_name=self._fsp_fqdn
)
self.logger.info(f"Created proxy for {self._fsp_fqdn}")
# Then subscribe to the HealthState of the device
self.health_monitor.subscribe_to_events(
proxy=self.proxy_dict[const.FSP],
attr_name="healthState",
expected_enum_type=HealthState,
)
self.logger.info(f"Subscribed to HMC for {self._fsp_fqdn}")
except tango.DevFailed as dev_failed:
self.logger.error(
f"Failed to connect to {self._fsp_fqdn}; {dev_failed}"
)
return False
except Exception as exception:
self.logger.error(f"{traceback.format_exc()}: {exception}")
return False
for device_id, fqdn in enumerate(self._fhs_monitor_fqdns, 1):
try:
self.proxy_dict[const.FHS_MONITOR][device_id] = (
context.DeviceProxy(
device_name=fqdn,
)
)
self.logger.info(f"Created proxy for {fqdn}")
# Then subscribe to the HealthState of the device
self.health_monitor.subscribe_to_events(
proxy=self.proxy_dict[const.FHS_MONITOR][device_id],
attr_name="healthState",
expected_enum_type=HealthState,
)
self.logger.info(f"Subscribed to HMC for {fqdn}")
except tango.DevFailed as dev_failed:
self.logger.error(f"Failed to connect to {fqdn}; {dev_failed}")
return False
except Exception as exception:
self.logger.error(f"{traceback.format_exc()}: {exception}")
return False
return True
[docs]
def start_communicating(
self: FspUnitComponentManager,
*args: Any,
admin_mode: AdminMode,
**kwargs: Any,
) -> None:
"""
Establish communication with the component, then start monitoring.
"""
self.logger.info(
"Entering fsp_unit_component_manager.start_communicating"
)
# No need to re-create proxies if we already created them previously
if self.is_communicating:
self.logger.info(
"Skipped creating proxies as they are already created"
)
success = True
else:
success = self._create_device_proxy()
self.logger.info("Created proxies")
if not success:
self.logger.error("Failed to initialize proxies.")
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
return
try:
self.proxy_dict[const.FSP].adminMode = admin_mode
except tango.DevFailed as dev_failed:
self.logger.error(
f"Failed to set {self._fsp_fqdn} to {admin_mode}; {dev_failed}"
)
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
return
except Exception as exception:
self.logger.error(f"{traceback.format_exc()}: {exception}")
return
super().start_communicating(admin_mode=admin_mode)
self._update_component_state(power=PowerState.ON)
self.logger.info("FSP Unit Start communication complete")
[docs]
def stop_communicating(
self: FspUnitComponentManager,
*args: Any,
admin_mode: AdminMode,
**kwargs: Any,
) -> None:
"""
Thread for stop_communicating operation.
"""
self.logger.info(
"Entering fsp_unit_component_manager.stop_communicating"
)
try:
self.health_monitor.unsubscribe_from_events(
proxy=self.proxy_dict[const.FSP], attr_names=["healthState"]
)
self.proxy_dict[const.FSP].adminMode = admin_mode
except tango.DevFailed as dev_failed:
self.logger.error(
f"Failed to set {self._fsp_fqdn} to {admin_mode}; {dev_failed}"
)
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
return
except Exception as exception:
self.logger.error(f"{traceback.format_exc()}: {exception}")
return False
try:
for proxy in self.proxy_dict[const.FHS_MONITOR].values():
self.health_monitor.unsubscribe_from_events(
proxy=proxy,
attr_names=["healthState"],
)
except tango.DevFailed as dev_failed:
self.logger.error(
f"Failed to unsubscribe healthState from {const.FHS_MONITOR}"
)
self.logger.error(f"Dev Exception: {dev_failed}")
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
return
except Exception as exception:
self.logger.error(f"{traceback.format_exc()}: {exception}")
return False
super().stop_communicating(admin_mode=admin_mode)
self.logger.info("FSP Unit Stop communication complete")
def _handle_admin_mode_callback(
self: FspUnitComponentManager,
admin_mode: AdminMode,
) -> None:
# TODO: Will be implemented in a future story
self.logger.info("Handle fhs host admin mode")
# --- Health Status -- #
def _fsp_unit_health_state_policy(self) -> str:
"""
Parse the vcc unit health state policy file and return a serialized json
that will be intake by HMC
"""
health_state_policies = []
hw_members = []
for fhs_seq in range(1, self.num_fhs_controllers_per_unit + 1):
hw_members.append(FHS_MONITOR_TEMPLATE.format(fhs_seq=fhs_seq))
health_state_policies.append(
PolicyConfig(
name=FSP_HW_POLICY_NAME,
members=hw_members,
policy=FspHardwareAggregationPolicy(
self.logger,
fhs_monitor_fqdns=self._fhs_monitor_fqdns,
network_switch_fqdn="",
), # TODO Networkswitch
callback=self._fsp_unit_hw_health_state_update,
)
)
health_state_policies.append(
PolicyConfig(
name="fspUnitAggregateHealthState",
members=[],
policy=FspUnitAggregationPolicy(
logger=self.logger, fqdn=self.fqdn, fsp_fqdn=self._fsp_fqdn
),
)
)
return health_state_policies
[docs]
def device_health_state_callback(self, health_info: tango.DevString):
if health_info is None:
self._device.health_info_signal = ""
else:
self._device.health_info_signal = health_info
# Set HW to OK for now until we can integrate it
self._device.health_state_hw_signal = HealthState.OK
def _fsp_resource_state_update(
self: FspUnitComponentManager, health_state: HealthState
) -> None:
"""
Push a health state update to the device.
:param health_state: the new health state of the component manager.
"""
self.health_state_fsp = health_state.name
def _fsp_unit_hw_health_state_update(
self: FspUnitComponentManager, health_state: HealthState
) -> None:
"""
Push a health state update to the device.
:param health_state: the new health state of the component manager.
:param fhs_seq: The sequence index (relative to this FHS Monitor) of the FSP Unit for this callback
"""
self.health_state_hw = health_state.name
# --- Resource Status -- #
@property
def resource_status(self) -> str:
"""
Getter Function for resource_status
:return: A JSON string representation of FSP's Resource Status
:rtype: str
"""
return self.get_resource_status()
[docs]
def get_resource_status(self: FspUnitComponentManager) -> str:
"""
Retrieves the the resource status from all available FSP devices and
format it into a ResourceStatus VCC Status Object JSON string.
Retrieves the following from the FSP devices:
* used_by_subarrays: The subarrays that is/are using the FSP
* health_state: HealthState of the FSP
* admin_mode: AdminMode of the FSP
* vcc_unit_id: This VCC Unit's ID
:return: a JSON string
:rtype: str
"""
fsp_rs_dict = {}
fsp_proxy = self.proxy_dict[const.FSP]
fsp_id = int(fsp_proxy.dev_name().split("/")[-1])
fsp_rs_dict[fsp_id] = {}
# The attribute return a ndarray of int64, which is not compatible with
# orjson.dumps
subarray_membership = [int(i) for i in fsp_proxy.subarrayMembership]
fsp_rs_dict[fsp_id]["used_by_subarrays"] = subarray_membership
fsp_rs_dict[fsp_id]["fsp_mode"] = ObsMode(fsp_proxy.obsMode).name
fsp_rs_dict[fsp_id]["health_state"] = HealthState(
fsp_proxy.healthState
).name
fsp_rs_dict[fsp_id]["admin_mode"] = AdminMode(fsp_proxy.adminMode).name
fsp_rs_dict[fsp_id]["fsp_unit_id"] = self._device_id
return orjson.dumps(
fsp_rs_dict, option=orjson.OPT_NON_STR_KEYS
).decode()