# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.
from __future__ import annotations
from functools import partial
from typing import Any, List
import orjson
# tango imports
import tango
from ska_control_model import AdminMode, HealthState, PowerState
from ska_mid_cbf_common import HealthMonitorComponent, PolicyConfig
from ska_tango_testing import context
from ska_mid_cbf_mcs.base.component_manager import (
CbfComponentManager,
CommunicationStatus,
)
from ska_mid_cbf_mcs.commons.global_enum import const
from ska_mid_cbf_mcs.vcc_unit.vcc_unit_healthstate_policy import (
VCC_ALL_BANDS_CONTROLLER_TEMPLATE,
VCC_HARDWARE_ATTR,
VCC_RESOURCE_TEMPLATE,
VccHardwareAggregationPolicy,
VccResourceAggregationPolicy,
VccUnitAggregationPolicy,
)
__all__ = ["VccUnitComponentManager"]
[docs]
class VccUnitComponentManager(CbfComponentManager):
"""
Component manager for Vcc Unit.
"""
def __init__(
self: VccUnitComponentManager,
*args: Any,
mid_cbf_controller_fqdn: str,
fhs_monitor_fqdn: str,
vcc_all_bands_fqdns: List[str],
device_id: int,
**kwargs: Any,
) -> None:
"""
Initialize a new instance.
:param mid_cbf_controller_fqdn: FQDN of the mid CBF controller
:param fhs_host_controller_fqdn: FQDN of the FHS host controller
:param vcc_all_bands_fqdns: FQDNs of VCC band devices
:param device_id: the device id of the MCS VCC Device
"""
# supply operating state machine trigger keywords
super().__init__(*args, **kwargs)
# --- Attribute Values --- #
# HealthState of the hardware
self._health_state_hw = HealthState.UNKNOWN
# HealthState of the VCC which aggregates the Signal processing and
# hardware HealthState
self._health_states_vccs = {}
self._num_vcc_per_unit = len(vcc_all_bands_fqdns)
for index in range(1, self._num_vcc_per_unit + 1):
self._health_states_vccs[index] = HealthState.UNKNOWN
self._mid_cbf_controller_fqdn = mid_cbf_controller_fqdn
self._fhs_monitor_fqdn = fhs_monitor_fqdn
self._vcc_all_bands_fqdns = vcc_all_bands_fqdns
self._device_id = device_id
# Setup the Health Monitor Component
self._fqdn = self._device.get_name()
self._hmc_vcc_unit = self._setup_vcc_unit_health_monitor_component()
self._health_state_vccs = {}
def _create_device_proxy(
self: VccUnitComponentManager,
) -> None:
# This function should be called to setup the proxy when the communication
# is set online
try:
self.proxy_dict["controller"] = context.DeviceProxy(
device_name=self._mid_cbf_controller_fqdn,
)
self.proxy_dict[const.FHS_MONITOR] = context.DeviceProxy(
device_name=self._fhs_monitor_fqdn
)
self.proxy_dict[const.VCC_ALL_BANDS] = {}
self._hmc_vcc_unit.subscribe_to_events(
proxy=self.proxy_dict[const.FHS_MONITOR],
attr_name="healthState",
expected_enum_type=HealthState,
)
# Note: There is a distinction between vcc_seq here and the VCC ID of the device.
# - vcc_seq is the sequence index relative to the specific VCC Unit
# such that each VCC Unit has VCC 1-6
# - VCC ID is absolute ID value of a VCC device within Mid.CBF ranging from 1-197
# Example: VCC Unit 2's vcc_seq 1 could contain VCC with a VCC_ID of 7
for vcc_seq, fqdn in enumerate(self._vcc_all_bands_fqdns, 1):
# First create the proxy
self.proxy_dict[const.VCC_ALL_BANDS][vcc_seq] = (
context.DeviceProxy(
device_name=fqdn,
)
)
# Then subscribe to the HealthState of the device
self._hmc_vcc_unit.subscribe_to_events(
proxy=self.proxy_dict[const.VCC_ALL_BANDS][vcc_seq],
attr_name="healthState",
expected_enum_type=HealthState,
)
except tango.DevFailed as dev_failed:
self.logger.error(f"{dev_failed}")
return False
return True
[docs]
def start_communicating(
self: VccUnitComponentManager,
*args: Any,
admin_mode: AdminMode,
**kwargs: Any,
) -> None:
"""
Establish communication with the component, then start monitoring.
"""
# No need to re-create proxies if we already created them previously
if self.is_communicating:
self.logger.info(
"Skipped creating proxies as they are already created"
)
success = True
else:
success = self._create_device_proxy()
self.logger.info("Created proxies")
# Update healthStatesVcc attribute signal
self._device.health_states_vcc_signal = orjson.dumps(
self._health_states_vccs, option=orjson.OPT_NON_STR_KEYS
).decode()
if not success:
self.logger.error("Failed to initialize proxies.")
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
return
if admin_mode == AdminMode.ENGINEERING:
for _, proxy in self.proxy_dict[const.VCC_ALL_BANDS].items():
subarray_id = proxy.subarrayID
if subarray_id != 0:
vcc_seq = int(proxy.dev_name().split("/")[-1])
self.logger.error(
f"Failed to set the admin mode to {admin_mode.name}. VCC {vcc_seq} is assigned to subarray {subarray_id}."
)
return
super().start_communicating(admin_mode=admin_mode)
self._update_component_state(power=PowerState.ON)
[docs]
def stop_communicating(
self: VccUnitComponentManager,
*args: Any,
admin_mode: AdminMode,
**kwargs: Any,
) -> None:
"""
Thread for stop_communicating operation.
"""
self._unsubscribe_all_events()
super().stop_communicating(admin_mode=admin_mode)
def _unsubscribe_all_events(self):
"""
Unsubscribe from all events the VCC Unit has subscribed to previously
"""
self._hmc_vcc_unit.unsubscribe_from_events(
proxy=self.proxy_dict[const.FHS_MONITOR],
attr_names=["healthState"],
)
for vcc_seq, _ in enumerate(self._vcc_all_bands_fqdns, 1):
self._hmc_vcc_unit.unsubscribe_from_events(
proxy=self.proxy_dict[const.VCC_ALL_BANDS][vcc_seq],
attr_names=["healthState"],
)
def _handle_vcc_all_bands_health_state_callback(
self: VccUnitComponentManager,
vcc_all_bands_id: int,
health_state: HealthState,
) -> None:
# TODO: Will be implemented in a future story as part of CIP-3599
self.logger.info("Handle VCC all bands health state")
def _fhs_host_health_state_callback(
self: VccUnitComponentManager,
health_state: HealthState,
) -> None:
# TODO: Will be implemented in a future story as part of CIP-3599
self.logger.info("Handle fhs host health state")
def _handle_admin_mode_callback(
self: VccUnitComponentManager,
admin_mode: AdminMode,
) -> None:
# TODO: Will be implemented in a future story as part of CIP-3599
self.logger.info("Handle admin mode")
# --- Resource Status -- #
@property
def resource_status(self) -> str:
"""
Getter Function for resource_status
When the variable is called, updates _resource_status and return its value
:return: A JSON string representation of VCC's Resource Status
:rtype: str
"""
return self.get_resource_status()
[docs]
def get_resource_status(self: VccUnitComponentManager) -> str:
"""
Retrieves the the resource status from all available VCC devices and
format it into a ResourceStatus VCC Status Object JSON string.
Retrieves the following from the VCC devices:
* used_by_subarray: The subarray that is using the VCC
* health_state: HealthState of the VCC
* admin_mode: AdminMode of the VCC
* vcc_unit_id: This VCC Unit's ID
:param vcc_unit_id: Device ID of the this VCC Unit
:return: a JSON string
:rtype: str
"""
vcc_rs_dict = {}
for _, vcc_proxy in (self.proxy_dict[const.VCC_ALL_BANDS]).items():
vcc_id = int(vcc_proxy.dev_name().split("/")[-1])
vcc_rs_dict[vcc_id] = {}
used_by_subarray = []
# For Unit and Integration testing:
# We need to store sub_id in a variable since the sim override value
# can only be retrieved once.
sub_id = vcc_proxy.subarrayID
# By default, FHS VCC sets subarrayID to 0 if the VCC is not used by
# any subarrays. 0 is an invalid value for resourceStatus schema.
if sub_id != 0:
used_by_subarray.append(sub_id)
vcc_rs_dict[vcc_id]["used_by_subarray"] = used_by_subarray
vcc_rs_dict[vcc_id]["health_state"] = HealthState(
vcc_proxy.healthState
).name
vcc_rs_dict[vcc_id]["admin_mode"] = AdminMode(
vcc_proxy.adminMode
).name
vcc_rs_dict[vcc_id]["vcc_unit_id"] = self._device_id
return orjson.dumps(
vcc_rs_dict, option=orjson.OPT_NON_STR_KEYS
).decode()
# Health Monitor Component
# --- VCC Unit Aggregate Health Monitor Component --- #
def _vcc_signal_path_healthstate_attr(self) -> list[str]:
"""
Generate a list of all VCC Allbands Controller's HealthState to be monitor by HMC
"""
attr_list = []
for vcc_seq in range(1, self._num_vcc_per_unit + 1):
attr_name = VCC_ALL_BANDS_CONTROLLER_TEMPLATE.format(
vcc_seq=vcc_seq
)
attr_list.append(attr_name)
return attr_list
# TODO: Curently this is only returning the internal_state.
# ADR-128 decision will change this to have a detail report on why a device
# failed
def _vcc_unit_aggregate_health_state_info_callback(
self, health_info: tango.DevString
):
if health_info is None:
self._device.health_info_signal = ""
else:
self._device.health_info_signal = health_info
def _vcc_unit_health_state_policy(self) -> str:
"""
Generate all the policies required for the VCC Unit HMC
Note that the sequence we add to the health_state_policies list determines
the order that the policies will be processed
"""
health_state_policies = []
health_state_policies += self._gen_vcc_hardware_health_state_policy()
health_state_policies += self._gen_vcc_resource_health_state_policy()
health_state_policies += self._gen_vcc_unit_agg_health_state_policy()
return health_state_policies
def _setup_vcc_unit_health_monitor_component(
self,
) -> HealthMonitorComponent:
return HealthMonitorComponent(
fqdn=self._fqdn,
policies=self._vcc_unit_health_state_policy(),
update_health_info_callback=self._vcc_unit_aggregate_health_state_info_callback,
update_health_state_callback=self._push_health_state_update,
logger=self.logger,
)
# --- VCC Hardware Aggregate Health Monitor Component --- #
def _gen_vcc_hardware_health_state_policy(self) -> list[PolicyConfig]:
"""
Generate the health state policy for VCC Hardware Health State Policy
"""
health_state_policies = [
# No Members given.
# VccHardwareAggregationPolicy hardcoded to look for FHS Monitor and Network Switch HealthState
# in the the internal_state
PolicyConfig(
name=VCC_HARDWARE_ATTR,
members=[],
policy=VccHardwareAggregationPolicy(
self.logger,
self._fqdn,
fhs_monitor_fqdn=self._fhs_monitor_fqdn,
network_switch_fqdn="", # TODO Add the Network Switch FQDN when defined
),
callback=self._update_vcc_unit_hw_health_state,
),
]
return health_state_policies
def _gen_vcc_unit_agg_health_state_policy(self) -> list[PolicyConfig]:
"""
Generate the health state policy for VCC Aggregate Health State Policy
"""
health_state_policies = [
# VccUnitAggregationPolicy also hardcoded to look for VCC HW HealthState in the internal_state
PolicyConfig(
name="vccUnitAggregateHealthState",
members=self._vcc_signal_path_healthstate_attr(),
policy=VccUnitAggregationPolicy(self.logger, self._fqdn),
),
]
return health_state_policies
def _gen_vcc_resource_health_state_policy(
self: VccUnitComponentManager,
) -> list[PolicyConfig]:
"""
Generate the health state policy for VCC Resource Health State Policy
"""
health_state_policies = []
for vcc_seq in range(1, self._num_vcc_per_unit + 1):
health_state_policies.append(
# VccResourceAggregationPolicy is also hardcoded to look
# for a the VCC HW HealthState in the internal_state.
PolicyConfig(
name=VCC_RESOURCE_TEMPLATE.format(vcc_seq=vcc_seq),
members=[
VCC_ALL_BANDS_CONTROLLER_TEMPLATE.format(
vcc_seq=vcc_seq
)
],
policy=VccResourceAggregationPolicy(
self.logger, self._fqdn
),
callback=partial(
self._update_vcc_resource_state, vcc_seq=vcc_seq
),
)
)
return health_state_policies
def _update_vcc_unit_hw_health_state(
self: VccUnitComponentManager, health_state: HealthState
) -> None:
"""
Push a health state update to the device.
:param health_state: the new health state of the component manager.
"""
self._health_state_vccs[VCC_HARDWARE_ATTR] = health_state.name
self._device.health_state_hw_signal = health_state
def _update_vcc_resource_state(
self: VccUnitComponentManager, health_state: HealthState, vcc_seq: int
) -> None:
"""
Push a health state update to the device.
:param health_state: the new health state of the component manager.
:param vcc_seq: The sequence index (relative to this VCC Unit) of the VCC Resource for this callback
"""
self._health_state_vccs[
VCC_RESOURCE_TEMPLATE.format(vcc_seq=vcc_seq)
] = health_state.name